Skip to content

Commit

Permalink
SQL: fix use of requestTimeout and pageTimeout query parameters (#79360)
Browse files Browse the repository at this point in the history
Resolves #72151 The _sql endpoint offers a `page_timeout` parameter for
customizing how long scroll contexts should be kept open (if needed) and
a `request_timeout` parameter which the docs describe as "Timeout before
the request fails.". Currently, the value of the `page_timeout`
parameter is used as the `timeout` in subsequent _search requests and
not as the timeout in the `scroll` configuration. For the `scroll`
configuration, SQL uses the `request_timeout` parameter. This PR
addresses the issue by swapping the uses of `page_timeout` and
`request_timeout` in querier. Additionally, the PR removes some unused
artifacts that might have caused some confusion: - The `timeout` and
`keepAlive` fields in `Querier`. Instead, `Querier` directly uses the
according fields in `SqlConfiguration`. - The `SqlConfiguration`
parameter from `ScrollCursor.clear`, it's not used but required an
instance of `SqlConfiguration` with all default values. - One overloaded
constructor of `SqlConfiguration` that was only used for calling
`ScrollCursor.clear` (and some tests) and used default values for an
(arbitrary?) subset of the fields. - The fields related to async
requests in `SqlConfiguration`. I'm a bit unsure about this one but the
fields are never read and it does not seem like an SQL specific concern.
The whole creation of the async tasks is handled in
`TransportSqlQueryAction` and the downstream components do not require
the information.
  • Loading branch information
Lukas Wegmann committed Oct 27, 2021
1 parent ee1f71d commit 9e66494
Show file tree
Hide file tree
Showing 22 changed files with 179 additions and 109 deletions.
7 changes: 5 additions & 2 deletions docs/reference/sql/apis/sql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ searches>> if you also specify the `wait_for_completion_timeout` parameter. If
the `wait_for_completion_timeout`. Defaults to `false`.

`page_timeout`::
(Optional, <<time-units,time value>>) Timeout before a
<<sql-pagination,pagination request>> fails. Defaults to `45s` (45 seconds).
(Optional, <<time-units,time value>>) Minimum retention period for the scroll
cursor. After this time period, a <<sql-pagination,pagination request>> might
fail because the scroll cursor is no longer available. Subsequent scroll requests
prolong the lifetime of the scroll cursor by the duration of `page_timeout` in
the scroll request. Defaults to `45s` (45 seconds).

`params`::
(Optional, array) Values for parameters in the `query`. For syntax, see
Expand Down
8 changes: 5 additions & 3 deletions docs/reference/sql/endpoints/jdbc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ Connection timeout (in milliseconds). That is the maximum amount of time waiting
`network.timeout` (default `60000`)::
Network timeout (in milliseconds). That is the maximum amount of time waiting for the network.

`page.timeout` (default `45000`)::
Page timeout (in milliseconds). That is the maximum amount of time waiting for a page.

`page.size` (default `1000`)::
Page size (in entries). The number of results returned per page by the server.

`page.timeout` (default `45000`)::
Page timeout (in milliseconds). Minimum retention period for the scroll cursor on the server. Queries that require
a stateful scroll cursor on the server side might fail after this timeout. Hence, when scrolling through large result sets,
processing `page.size` records should not take longer than `page.timeout` milliseconds.

`query.timeout` (default `90000`)::
Query timeout (in milliseconds). That is the maximum amount of time waiting for a query to return.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.sql.action;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchService;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;

public class SqlSearchPageTimeoutIT extends AbstractSqlIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
// use static low keepAlive interval to ensure obsolete search contexts are pruned soon enough
settings.put(SearchService.KEEPALIVE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200));
return settings.build();
}

public void testSearchContextIsCleanedUpAfterPageTimeoutForHitsQueries() throws Exception {
setupTestIndex();

SqlQueryResponse response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT field FROM test")
.fetchSize(1)
.pageTimeout(TimeValue.timeValueMillis(100))
.get();

assertEquals(1, response.size());
assertTrue(response.hasCursor());
assertEquals(1, getNumberOfSearchContexts());

assertBusy(() -> assertEquals(0, getNumberOfSearchContexts()), 3, TimeUnit.SECONDS);

SearchPhaseExecutionException exception = expectThrows(
SearchPhaseExecutionException.class,
() -> new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(response.cursor()).get()
);

assertThat(Arrays.asList(exception.guessRootCauses()), contains(instanceOf(SearchContextMissingException.class)));
}

public void testNoSearchContextForAggregationQueries() throws InterruptedException {
setupTestIndex();

SqlQueryResponse response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query(
"SELECT COUNT(*) FROM test GROUP BY field"
).fetchSize(1).pageTimeout(TimeValue.timeValueMillis(100)).get();

assertEquals(1, response.size());
assertTrue(response.hasCursor());
assertEquals(0, getNumberOfSearchContexts());

Thread.sleep(1000);

// since aggregation queries do not have a stateful search context, scrolling is still possible after page_timeout
response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(response.cursor()).get();

assertEquals(1, response.size());
}

private void setupTestIndex() {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test").id("1").source("field", "bar"))
.add(new IndexRequest("test").id("2").source("field", "baz"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
}

private long getNumberOfSearchContexts() {
return client().admin()
.indices()
.prepareStats("test")
.clear()
.setSearch(true)
.get()
.getIndex("test")
.getTotal()
.getSearch()
.getOpenContexts();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void nextPage(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> l
}));
}

public void cleanCursor(SqlConfiguration cfg, Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(cfg, client, listener);
public void cleanCursor(Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(client, listener);
}

public Client client() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
log.trace("About to execute composite query {} on {}", StringUtils.toString(query), indices);
}

SearchRequest request = Querier.prepareRequest(client, query, cfg.pageTimeout(), includeFrozen, indices);
SearchRequest request = Querier.prepareRequest(query, cfg.requestTimeout(), includeFrozen, indices);

client.search(request, new ActionListener.Delegating<>(listener) {
@Override
Expand Down Expand Up @@ -267,7 +267,7 @@ private static byte[] serializeQuery(SearchSourceBuilder source) throws IOExcept


@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
listener.onResponse(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand All @@ -28,6 +27,7 @@
import org.elasticsearch.search.aggregations.bucket.filter.Filters;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ql.execution.search.FieldExtraction;
import org.elasticsearch.xpack.ql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.ql.execution.search.extractor.ComputingExtractor;
Expand Down Expand Up @@ -93,7 +93,6 @@ public class Querier {

private final PlanExecutor planExecutor;
private final SqlConfiguration cfg;
private final TimeValue keepAlive, timeout;
private final int size;
private final Client client;
@Nullable
Expand All @@ -103,21 +102,14 @@ public Querier(SqlSession sqlSession) {
this.planExecutor = sqlSession.planExecutor();
this.client = sqlSession.client();
this.cfg = sqlSession.configuration();
this.keepAlive = cfg.requestTimeout();
this.timeout = cfg.pageTimeout();
this.filter = cfg.filter();
this.size = cfg.pageSize();
}

public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
// set query timeout
if (timeout.getSeconds() > 0) {
sourceBuilder.timeout(timeout);
}

// set runtime mappings
if (this.cfg.runtimeMappings() != null) {
sourceBuilder.runtimeMappings(this.cfg.runtimeMappings());
}
Expand All @@ -126,7 +118,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
}

SearchRequest search = prepareRequest(client, sourceBuilder, timeout, query.shouldIncludeFrozen(),
SearchRequest search = prepareRequest(sourceBuilder, cfg.requestTimeout(), query.shouldIncludeFrozen(),
Strings.commaDelimitedListToStringArray(index));

@SuppressWarnings("rawtypes")
Expand All @@ -141,7 +133,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
l = new CompositeActionListener(listener, client, cfg, output, query, search);
}
} else {
search.scroll(keepAlive);
search.scroll(cfg.pageTimeout());
l = new ScrollActionListener(listener, client, cfg, output, query);
}

Expand All @@ -152,7 +144,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
client.search(search, l);
}

public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
public static SearchRequest prepareRequest(SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
source.timeout(timeout);

Expand Down Expand Up @@ -534,15 +526,13 @@ abstract static class BaseActionListener extends ActionListener.Delegating<Searc

final Client client;
final SqlConfiguration cfg;
final TimeValue keepAlive;
final Schema schema;

BaseActionListener(ActionListener<Page> listener, Client client, SqlConfiguration cfg, List<Attribute> output) {
super(listener);

this.client = client;
this.cfg = cfg;
this.keepAlive = cfg.requestTimeout();
this.schema = Rows.schema(output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.ql.type.Schema;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.BitSet;
Expand Down Expand Up @@ -100,13 +100,13 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
client.searchScroll(request, wrap(response -> {
handle(response, () -> new SearchHitRowSet(extractors, mask, limit, response),
p -> listener.onResponse(p),
p -> clear(cfg, client, wrap(success -> listener.onResponse(p), listener::onFailure)),
p -> clear(client, wrap(success -> listener.onResponse(p), listener::onFailure)),
Schema.EMPTY);
}, listener::onFailure));
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
cleanCursor(client, scrollId, wrap(
clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()),
listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.action.BasicFormatter;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -59,8 +59,8 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
delegate.clear(cfg, client, listener);
public void clear(Client client, ActionListener<Boolean> listener) {
delegate.clear(client, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.action.SqlClearCursorRequest;
import org.elasticsearch.xpack.sql.action.SqlClearCursorResponse;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.util.DateUtils;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.sql.action.SqlClearCursorAction.NAME;

public class TransportSqlClearCursorAction extends HandledTransportAction<SqlClearCursorRequest, SqlClearCursorResponse> {
Expand All @@ -47,11 +42,9 @@ public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest re
ActionListener<SqlClearCursorResponse> listener) {
Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
planExecutor.cleanCursor(
new SqlConfiguration(DateUtils.UTC, null, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
emptyMap(), request.mode(), StringUtils.EMPTY, request.version(), StringUtils.EMPTY, StringUtils.EMPTY,
Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN),
cursor, ActionListener.wrap(
success -> listener.onResponse(new SqlClearCursorResponse(success)), listener::onFailure));
cursor,
ActionListener.<Boolean>wrap(success -> listener.onResponse(new SqlClearCursorResponse(success)), listener::onFailure)
);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,8 +112,7 @@ public static void operation(PlanExecutor planExecutor, SqlQueryTask task, SqlQu
SqlConfiguration cfg = new SqlConfiguration(request.zoneId(), request.catalog(), request.fetchSize(), request.requestTimeout(),
request.pageTimeout(), request.filter(), request.runtimeMappings(), request.mode(), request.clientId(), request.version(),
username, clusterName(clusterService), request.fieldMultiValueLeniency(), request.indexIncludeFrozen(),
new TaskId(clusterService.localNode().getId(), task.getId()), task,
request.waitForCompletionTimeout(), request.keepOnCompletion(), request.keepAlive());
new TaskId(clusterService.localNode().getId(), task.getId()), task);

if (Strings.hasText(request.cursor()) == false) {
executeRequestWithRetryAttempt(clusterService, listener::onFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doExecute(Task task, SqlTranslateRequest request, ActionListener<
request.requestTimeout(), request.pageTimeout(), request.filter(), request.runtimeMappings(),
request.mode(), request.clientId(), request.version(),
username(securityContext), clusterName(clusterService), Protocol.FIELD_MULTI_VALUE_LENIENCY,
Protocol.INDEX_INCLUDE_FROZEN);
Protocol.INDEX_INCLUDE_FROZEN, null, null);

planExecutor.searchSource(cfg, request.query(), request.params(), ActionListener.wrap(
searchSourceBuilder -> listener.onResponse(new SqlTranslateResponse(searchSourceBuilder)), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ public static Page last(RowSet rowSet) {
/**
* Cleans the resources associated with the cursor
*/
void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener);
void clear(Client client, ActionListener<Boolean> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
// There is nothing to clean
listener.onResponse(false);
}
Expand Down

0 comments on commit 9e66494

Please sign in to comment.