Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile;
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
Expand All @@ -54,8 +55,9 @@
*/
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class PushExpressionToLoadIT extends ESRestTestCase {

@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster(spec -> spec.plugin("inference-service-test"));
public static ElasticsearchCluster cluster = Clusters.testCluster();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No inference needed, simplifying


@Rule(order = Integer.MIN_VALUE)
public ProfileLogger profileLogger = new ProfileLogger();
Expand Down Expand Up @@ -365,6 +367,96 @@ public void testVHammingToBit() throws IOException {
);
}

//
// Tests without STATS at the end - check that node_reduce phase works correctly
//
public void testLengthPushedWithoutTopN() throws IOException {
String textValue = "v".repeat(between(0, 256));
test(
b -> b.startObject("test").field("type", "keyword").endObject(),
b -> b.field("test", textValue),
"""
FROM test
| EVAL fieldLength = LENGTH(test)
| LIMIT 10
| KEEP test, fieldLength
""",
matchesList().item(textValue).item(textValue.length()),
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class)))
.item(matchesMap().entry("name", "fieldLength").entry("type", any(String.class))),
Map.of(
"data",
List.of(
// Pushed down function
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
// Field
matchesMap().entry("test:row_stride:BytesRefsFromOrds.Singleton", 1)
)
),
sig -> {}
);
}

public void testLengthPushedWithTopN() throws IOException {
String textValue = "v".repeat(between(0, 256));
Integer orderingValue = randomInt();
test(b -> {
b.startObject("test").field("type", "keyword").endObject();
b.startObject("ordering").field("type", "integer").endObject();
},
b -> b.field("test", textValue).field("ordering", orderingValue),
"""
FROM test
| EVAL fieldLength = LENGTH(test)
| SORT ordering DESC
| LIMIT 10
| KEEP test
""",
matchesList().item(textValue),
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class))),
Map.of(
"data",
List.of(matchesMap().entry("ordering:column_at_a_time:IntsFromDocValues.Singleton", 1)),
"node_reduce",
List.of(
// Pushed down function
matchesMap().entry("test:column_at_a_time:Utf8CodePointsFromOrds.Singleton", 1),
// Field
matchesMap().entry("test:row_stride:BytesRefsFromOrds.Singleton", 1)
)
),
sig -> {}
);
}

public void testLengthPushedWithTopNAsOrder() throws IOException {
String textValue = "v".repeat(between(0, 256));
test(
b -> b.startObject("test").field("type", "keyword").endObject(),
b -> b.field("test", textValue),
"""
FROM test
| EVAL fieldLength = LENGTH(test)
| SORT fieldLength DESC
| LIMIT 10
| KEEP test, fieldLength
""",
matchesList().item(textValue).item(textValue.length()),
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class)))
.item(matchesMap().entry("name", "fieldLength").entry("type", any(String.class))),
Map.of(
"data",
List.of(
// Pushed down function
matchesMap().entry("test:column_at_a_time:Utf8CodePointsFromOrds.Singleton", 1),
// TODO It should not load the field value on the data node, but just on the node_reduce phase
matchesMap().entry("test:row_stride:BytesRefsFromOrds.Singleton", 1)
)
),
sig -> {}
);
}

//
// Tests for more complex shapes.
//
Expand Down Expand Up @@ -639,23 +731,34 @@ private void test(
MapMatcher expectedLoaders,
Consumer<List<String>> assertDataNodeSig
) throws IOException {

test(
mapping,
doc,
"""
FROM test
""" + eval + """
| STATS test = MV_SORT(VALUES(test))
""",
expectedValue,
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class))),
Map.of("data", List.of(expectedLoaders)),
assertDataNodeSig
);
}

private void test(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reorganized the tests so we can check directly:

  • The expected columns
  • The expected field loaders per driver - we can check the field loaders for data and node_reduce drivers independently.

CheckedConsumer<XContentBuilder, IOException> mapping,
CheckedConsumer<XContentBuilder, IOException> doc,
String query,
Matcher<?> expectedValue,
Matcher<?> columnMatcher,
Map<String, List<MapMatcher>> expectedLoadersPerDriver,
Consumer<List<String>> assertDataNodeSig
) throws IOException {
indexValue(mapping, doc);
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query("""
FROM test
""" + eval + """
| STATS test = MV_SORT(VALUES(test))
""");
/*
* TODO if you just do KEEP test then the load is in the data node reduce driver and not merged:
* \_ProjectExec[[test{f}#7]]
* \_FieldExtractExec[test{f}#7]<[],[]>
* \_EsQueryExec[test], indexMode[standard]]
* \_ExchangeSourceExec[[test{f}#7],false]}, {cluster_name=test-cluster, node_name=test-cluster-0, descrip
* \_ProjectExec[[test{r}#3]]
* \_EvalExec[[LENGTH(test{f}#7) AS test#3]]
* \_LimitExec[1000[INTEGER],50]
* \_ExchangeSourceExec[[test{f}#7],false]}], query={to
*/
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(query);

builder.profile(true);
Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), profileLogger, RestEsqlTestCase.Mode.SYNC);

Expand All @@ -669,22 +772,21 @@ private void test(
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
),
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class))),
columnMatcher,
matchesList().item(expectedValue)
);
@SuppressWarnings("unchecked")
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
for (Map<String, Object> p : profiles) {
fixTypesOnProfile(p);
assertThat(p, commonProfile());
List<String> sig = new ArrayList<>();
@SuppressWarnings("unchecked")
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
for (Map<String, Object> o : operators) {
sig.add(checkOperatorProfile(o, expectedLoaders));
}
String description = p.get("description").toString();
switch (description) {

String driverDescription = (String) p.get("description");
List<MapMatcher> mapMatcher = expectedLoadersPerDriver.get(driverDescription);
List<String> sig = checkOperatorProfile(driverDescription, operators, mapMatcher);
switch (driverDescription) {
case "data" -> {
logger.info("data {}", sig);
assertDataNodeSig.accept(sig);
Expand All @@ -694,7 +796,7 @@ private void test(
case "main.final" -> logger.info("main final {}", sig);
case "subplan-0.final" -> logger.info("subplan-0 final {}", sig);
case "subplan-1.final" -> logger.info("subplan-1 final {}", sig);
default -> throw new IllegalArgumentException("can't match " + description);
default -> throw new IllegalArgumentException("can't match " + driverDescription);
}
}
}
Expand Down Expand Up @@ -793,48 +895,36 @@ private void initLookupIndex() throws IOException {
}

private CheckedConsumer<XContentBuilder, IOException> justType(String type) {
return b -> b.startObject("test").field("type", type).endObject();
}

private static String checkOperatorProfile(Map<String, Object> o, MapMatcher expectedLoaders) {
String name = (String) o.get("operator");
name = PushQueriesIT.TO_NAME.matcher(name).replaceAll("");
if (name.equals("ValuesSourceReaderOperator")) {
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name))
.entry("status", matchesMap().entry("readers_built", expectedLoaders).extraOk());
assertMap(o, expectedOp);
return justType("test", type);
}

private CheckedConsumer<XContentBuilder, IOException> justType(String fieldName, String type) {
return b -> b.startObject(fieldName).field("type", type).endObject();
}

private static List<String> checkOperatorProfile(
String driverDesc,
List<Map<String, Object>> operators,
List<MapMatcher> expectedLoaders
) {
List<String> sig = new ArrayList<>();
for (Map<String, Object> operator : operators) {
String name = (String) operator.get("operator");
name = PushQueriesIT.TO_NAME.matcher(name).replaceAll("");
if (name.equals("ValuesSourceReaderOperator")) {
assertNotNull("Expected loaders to match the ValuesSourceReaderOperator for driver " + driverDesc, expectedLoaders);
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name))
.entry("status", matchesMap().entry("readers_built", anyOf(expectedLoaders.toArray(new MapMatcher[0]))).extraOk());
assertMap("Error checking values loaded for driver " + driverDesc + "; ", operator, expectedOp);
}
sig.add(name);
}
return name;

return sig;
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected boolean preserveClusterUponCompletion() {
// Preserve the cluser to speed up the semantic_text tests
return true;
}

private static boolean setupEmbeddings = false;

private void setUpTextEmbeddingInferenceEndpoint() throws IOException {
setupEmbeddings = true;
Request request = new Request("PUT", "_inference/text_embedding/test");
request.setJsonEntity("""
{
"service": "text_embedding_test_service",
"service_settings": {
"model": "my_model",
"api_key": "abc64",
"dimensions": 128
},
"task_settings": {
}
}
""");
adminClient().performRequest(request);
}
}
Loading