Skip to content

Commit

Permalink
SQL: Fix metrics reporting when sorting on aggregated values (#81510) (
Browse files Browse the repository at this point in the history
…#81638)

Resolves #81502

The bug has been caused by Querier using PlanExecutor.nextPage for internal calls. Because PlanExecutor.nextPage also reports the API usage metrics, the <clientType>.paging, <clientType>.total, _all.paging and _all.total counters have been increased incorrectly for queries performing a sort on aggregated values.
  • Loading branch information
Lukas Wegmann committed Dec 13, 2021
1 parent 5b38441 commit a4b007e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_QUERY_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_STATS_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_TRANSLATE_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.query;

public abstract class RestSqlUsageTestCase extends ESRestTestCase {
Expand Down Expand Up @@ -59,11 +60,14 @@ public String toString() {

private Map<String, Integer> baseMetrics = new HashMap<String, Integer>();
private Integer baseClientTypeTotalQueries = 0;
private Integer baseClientTypePagingQueries = 0;
private Integer baseClientTypeFailedQueries = 0;
private Integer baseAllTotalQueries = 0;
private Integer baseAllPagingQueries = 0;
private Integer baseAllFailedQueries = 0;
private Integer baseTranslateRequests = 0;
private String clientType;
private String mode;
private boolean ignoreClientType;

/**
Expand All @@ -87,6 +91,16 @@ private void getBaseMetrics() throws UnsupportedOperationException, IOException
clientType = ClientType.REST.toString();
}

if (clientType.equals(ClientType.JDBC.toString())) {
mode = Mode.JDBC.toString();
} else if (clientType.startsWith(ClientType.ODBC.toString())) {
mode = Mode.ODBC.toString();
} else if (clientType.equals(ClientType.CLI.toString())) {
mode = Mode.CLI.toString();
} else {
mode = Mode.PLAIN.toString();
}

for (Map perNodeStats : nodesListStats) {
Map featuresMetrics = (Map) ((Map) perNodeStats.get("stats")).get("features");
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
Expand All @@ -96,8 +110,10 @@ private void getBaseMetrics() throws UnsupportedOperationException, IOException

// initialize the "base" metric values with whatever values are already recorded on ES
baseClientTypeTotalQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("total");
baseClientTypePagingQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("paging");
baseClientTypeFailedQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("failed");
baseAllTotalQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("total");
baseAllPagingQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("paging");
baseAllFailedQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("failed");
baseTranslateRequests = ((Map<String, Integer>) queriesMetrics.get("translate")).get("count");
}
Expand Down Expand Up @@ -224,6 +240,47 @@ public void testSqlRestUsage() throws IOException {
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
}

public void testScrollUsage() throws IOException {
index(testData);

String cursor = runSql("SELECT page_count, name FROM library ORDER BY page_count", randomIntBetween(1, testData.size()));
int scrollRequests = 0;

while (cursor != null) {
cursor = scroll(cursor);
scrollRequests++;
}

Map<String, Object> responseAsMap = getStats();
assertClientTypeQueryMetric(baseClientTypeTotalQueries + scrollRequests + 1, responseAsMap, "total");
assertClientTypeQueryMetric(baseClientTypePagingQueries + scrollRequests, responseAsMap, "paging");

assertAllQueryMetric(baseAllTotalQueries + scrollRequests + 1, responseAsMap, "total");
assertAllQueryMetric(baseAllPagingQueries + scrollRequests, responseAsMap, "paging");

assertFeatureMetric(baseMetrics.get("orderby") + 1, responseAsMap, "orderby");
}

// test for bug https://github.com/elastic/elasticsearch/issues/81502
public void testUsageOfQuerySortedByAggregationResult() throws IOException {
index(testData);

String cursor = runSql("SELECT SUM(page_count), name FROM library GROUP BY 2 ORDER BY 1", 1);

Map<String, Object> responseAsMap = getStats();
assertClientTypeQueryMetric(baseClientTypeTotalQueries + 1, responseAsMap, "total");
assertClientTypeQueryMetric(baseClientTypePagingQueries, responseAsMap, "paging");
assertAllQueryMetric(baseAllTotalQueries + 1, responseAsMap, "total");
assertAllQueryMetric(baseAllPagingQueries, responseAsMap, "paging");

scroll(cursor);
responseAsMap = getStats();
assertClientTypeQueryMetric(baseClientTypeTotalQueries + 2, responseAsMap, "total");
assertClientTypeQueryMetric(baseClientTypePagingQueries + 1, responseAsMap, "paging");
assertAllQueryMetric(baseAllTotalQueries + 2, responseAsMap, "total");
assertAllQueryMetric(baseAllPagingQueries + 1, responseAsMap, "paging");
}

private void assertClientTypeAndAllQueryMetrics(int clientTypeTotalQueries, int allTotalQueries, Map<String, Object> responseAsMap)
throws IOException {
assertClientTypeQueryMetric(clientTypeTotalQueries, responseAsMap, "total");
Expand Down Expand Up @@ -277,17 +334,16 @@ private void runTranslate(String sql) throws IOException {
client().performRequest(request);
}

private void runSql(String sql) throws IOException {
Mode mode = Mode.PLAIN;
if (clientType.equals(ClientType.JDBC.toString())) {
mode = Mode.JDBC;
} else if (clientType.startsWith(ClientType.ODBC.toString())) {
mode = Mode.ODBC;
} else if (clientType.equals(ClientType.CLI.toString())) {
mode = Mode.CLI;
}
private String runSql(String sql) throws IOException {
return runSql(sql, null);
}

runSql(mode.toString(), clientType, sql);
private String scroll(String cursor) throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
request.setEntity(
new StringEntity(RestSqlTestCase.cursor(cursor).mode(mode).clientId(clientType).toString(), ContentType.APPLICATION_JSON)
);
return (String) toMap(client().performRequest(request), mode).get("cursor");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -302,7 +358,7 @@ private void assertTranslateQueryMetric(int expected, Map<String, Object> respon
assertEquals(expected, actualMetricValue);
}

private void runSql(String mode, String restClient, String sql) throws IOException {
private String runSql(String sql, Integer fetchSize) throws IOException {
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
Expand All @@ -318,11 +374,11 @@ private void runSql(String mode, String restClient, String sql) throws IOExcepti
}
request.setEntity(
new StringEntity(
query(sql).mode(mode).clientId(ignoreClientType ? StringUtils.EMPTY : restClient).toString(),
query(sql).fetchSize(fetchSize).mode(mode).clientId(ignoreClientType ? StringUtils.EMPTY : clientType).toString(),
ContentType.APPLICATION_JSON
)
);
client().performRequest(request);
return (String) toMap(client().performRequest(request), mode).get("cursor");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,19 @@ public void nextPage(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> l
metrics.total(metric);
metrics.paging(metric);

cursor.nextPage(cfg, client, writableRegistry, wrap(listener::onResponse, ex -> {
nextPageInternal(cfg, cursor, wrap(listener::onResponse, ex -> {
metrics.failed(metric);
listener.onFailure(ex);
}));
}

/**
* `nextPage` for internal callers (not from the APIs) without metrics reporting.
*/
public void nextPageInternal(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> listener) {
cursor.nextPage(cfg, client, writableRegistry, listener);
}

public void cleanCursor(Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(client, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -91,24 +89,19 @@
public class Querier {
private static final Logger log = LogManager.getLogger(Querier.class);

private final PlanExecutor planExecutor;
private final SqlConfiguration cfg;
private final int size;
private final Client client;
@Nullable
private final QueryBuilder filter;

public Querier(SqlSession sqlSession) {
this.planExecutor = sqlSession.planExecutor();
this.client = sqlSession.client();
this.cfg = sqlSession.configuration();
this.filter = cfg.filter();
this.size = cfg.pageSize();
private final PlanExecutor planExecutor;

public Querier(SqlSession session) {
this.client = session.client();
this.planExecutor = session.planExecutor();
this.cfg = session.configuration();
}

public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, cfg.filter(), cfg.pageSize());

if (this.cfg.runtimeMappings() != null) {
sourceBuilder.runtimeMappings(this.cfg.runtimeMappings());
Expand Down Expand Up @@ -244,7 +237,7 @@ public void onResponse(Page page) {
// 1a. trigger a next call if there's still data
if (cursor != Cursor.EMPTY) {
// trigger a next call
planExecutor.nextPage(cfg, cursor, this);
planExecutor.nextPageInternal(cfg, cursor, this);
// make sure to bail out afterwards as we'll get called by a different thread
return;
}
Expand Down

0 comments on commit a4b007e

Please sign in to comment.