Skip to content

Commit

Permalink
Add async_search get and delete APIs to HLRC (elastic#53828)
Browse files Browse the repository at this point in the history
This commit adds the "_async_searhc" get and delete APIs to the
AsyncSearchClient in the High Level Rest Client.

Relates to elastic#49091
Backport of elastic#53828
  • Loading branch information
Christoph Büscher committed Mar 23, 2020
1 parent 0528e2b commit 47fc864
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.asyncsearch.AsyncSearchResponse;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.client.core.AcknowledgedResponse;

import java.io.IOException;

Expand All @@ -42,7 +45,7 @@ public class AsyncSearchClient {
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException {
public AsyncSearchResponse submit(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet());
}
Expand All @@ -57,10 +60,61 @@ public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, R
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options,
public Cancellable submitAsync(SubmitAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet());
}

/**
* Get an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
*
*/
public AsyncSearchResponse get(GetAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet());
}

/**
* Asynchronously get an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getAsync(GetAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet());
}

/**
* Delete an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse delete(DeleteAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options,
AcknowledgedResponse::fromXContent, emptySet());
}

/**
* Asynchronously delete an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deleteAsync(DeleteAsyncSearchRequest request, RequestOptions options,
ActionListener<AcknowledgedResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options,
AcknowledgedResponse::fromXContent, listener, emptySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.client;

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.RequestConverters.Params;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;

Expand Down Expand Up @@ -71,4 +75,29 @@ static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest reque
}
params.withBatchedReduceSize(request.getBatchedReduceSize());
}

static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(asyncSearchRequest.getId())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Params params = new RequestConverters.Params();
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}

static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(deleteAsyncSearchRequest.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.elasticsearch.client.asyncsearch;

import org.elasticsearch.client.Validatable;

import java.util.Objects;

public class DeleteAsyncSearchRequest implements Validatable {

private final String id;

public DeleteAsyncSearchRequest(String id) {
this.id = id;
}

public String getId() {
return this.id;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeleteAsyncSearchRequest request = (DeleteAsyncSearchRequest) o;
return Objects.equals(getId(), request.getId());
}

@Override
public int hashCode() {
return Objects.hash(getId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.elasticsearch.client.asyncsearch;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Objects;
import java.util.Optional;

public class GetAsyncSearchRequest implements Validatable {

private TimeValue waitForCompletion;
private TimeValue keepAlive;

public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();

private final String id;

public GetAsyncSearchRequest(String id) {
this.id = id;
}

public String getId() {
return this.id;
}

public TimeValue getWaitForCompletion() {
return waitForCompletion;
}

public void setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

public TimeValue getKeepAlive() {
return keepAlive;
}

public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}

@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GetAsyncSearchRequest request = (GetAsyncSearchRequest) o;
return Objects.equals(getId(), request.getId())
&& Objects.equals(getKeepAlive(), request.getKeepAlive())
&& Objects.equals(getWaitForCompletion(), request.getWaitForCompletion());
}

@Override
public int hashCode() {
return Objects.hash(getId(), getKeepAlive(), getWaitForCompletion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.client;

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -112,4 +116,35 @@ private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map<
expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests()));
}

public void testGetAsyncSearch() throws Exception {
String id = randomAlphaOfLengthBetween(5, 10);
Map<String, String> expectedParams = new HashMap<>();
GetAsyncSearchRequest submitRequest = new GetAsyncSearchRequest(id);
if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setKeepAlive(keepAlive);
expectedParams.put("keep_alive", keepAlive.getStringRep());
}
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
}

Request request = AsyncSearchRequestConverters.getAsyncSearch(submitRequest);
String endpoint = "/_async_search/" + id;
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
}

public void testDeleteAsyncSearch() throws Exception {
String id = randomAlphaOfLengthBetween(5, 10);
DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(id);

Request request = AsyncSearchRequestConverters.deleteAsyncSearch(deleteRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_async_search/" + id, request.getEndpoint());
assertTrue(request.getParameters().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,50 @@

import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class AsyncSearchIT extends ESRestHighLevelClientTestCase {

public void testSubmitAsyncSearchRequest() throws IOException {
public void testAsyncSearch() throws IOException {
String index = "test-index";
createIndex(index, Settings.EMPTY);

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(sourceBuilder, index);
// 15 sec should be enough to make sure we always complete right away
request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS));
AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT);
assertTrue(response.getVersion() >= 0);
assertFalse(response.isPartial());
assertTrue(response.getStartTime() > 0);
assertTrue(response.getExpirationTime() > 0);
assertNotNull(response.getSearchResponse());
if (response.isRunning() == false) {
assertNull(response.getId());
assertFalse(response.isPartial());
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index);
submitRequest.setCleanOnCompletion(false);
AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT);
assertNotNull(submitResponse.getId());
assertFalse(submitResponse.isPartial());
assertTrue(submitResponse.getStartTime() > 0);
assertTrue(submitResponse.getExpirationTime() > 0);
assertNotNull(submitResponse.getSearchResponse());
if (submitResponse.isRunning() == false) {
assertFalse(submitResponse.isPartial());
} else {
assertTrue(response.isPartial());
assertNotNull(response.getId());
assertTrue(submitResponse.isPartial());
}

GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(submitResponse.getId());
AsyncSearchResponse getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
while (getResponse.isRunning()) {
getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
}
}

assertFalse(getResponse.isRunning());
assertFalse(getResponse.isPartial());
assertTrue(getResponse.getStartTime() > 0);
assertTrue(getResponse.getExpirationTime() > 0);
assertNotNull(getResponse.getSearchResponse());

DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(submitResponse.getId());
AcknowledgedResponse deleteAsyncSearchResponse = highLevelClient().asyncSearch().delete(deleteRequest,
RequestOptions.DEFAULT);
assertNotNull(deleteAsyncSearchResponse);
assertNotNull(deleteAsyncSearchResponse.isAcknowledged());
}
}
Loading

0 comments on commit 47fc864

Please sign in to comment.