Skip to content

Commit

Permalink
ESQL: Test expression translators for binary comparisons and ranges (#…
Browse files Browse the repository at this point in the history
…108609)

Part of #106679

Add tests for our query translators for binary comparisons and ranges.

Turns out, our query translators, which we use whenever we push down
filters to Lucene, have only partial test coverage in some of the
LocalPhysicalOptimzerTests. We need to expand test coverage, so this
sets up the necessary plumbing and adds the first query translator spec
tests.
  • Loading branch information
alex-spies committed May 16, 2024
1 parent 8a43665 commit 5108f6b
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@
import static org.elasticsearch.xpack.ql.util.NumericUtils.unsignedLongAsNumber;

public final class EsqlExpressionTranslators {

public static final List<ExpressionTranslator<?>> QUERY_TRANSLATORS = List.of(
new EqualsIgnoreCaseTranslator(),
new BinaryComparisons(),
new SpatialRelatesTranslator(),
// Ranges is redundant until we start combining binary comparisons (see CombineBinaryComparisons in ql's OptimizerRules)
// or introduce a BETWEEN keyword.
new ExpressionTranslators.Ranges(),
new ExpressionTranslators.BinaryLogic(),
new ExpressionTranslators.IsNulls(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
Expand All @@ -42,8 +41,6 @@
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.planner.FilterTests;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
Expand Down Expand Up @@ -93,13 +90,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
*/
private static final int KEYWORD_EST = EstimatesRowSize.estimateSize(DataTypes.KEYWORD);

private EsqlParser parser;
private Analyzer analyzer;
private LogicalPlanOptimizer logicalOptimizer;
private PhysicalPlanOptimizer physicalPlanOptimizer;
private EsqlFunctionRegistry functionRegistry;
private Mapper mapper;

private TestPlannerOptimizer plannerOptimizer;
private final EsqlConfiguration config;
private final SearchStats IS_SV_STATS = new TestSearchStats() {
@Override
Expand All @@ -126,11 +117,6 @@ public LocalPhysicalPlanOptimizerTests(String name, EsqlConfiguration config) {

@Before
public void init() {
parser = new EsqlParser();
logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));
physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
functionRegistry = new EsqlFunctionRegistry();
mapper = new Mapper(functionRegistry);
EnrichResolution enrichResolution = new EnrichResolution();
enrichResolution.addResolvedPolicy(
"foo",
Expand All @@ -146,15 +132,18 @@ public void init() {
)
)
);
analyzer = makeAnalyzer("mapping-basic.json", enrichResolution);
plannerOptimizer = new TestPlannerOptimizer(config, makeAnalyzer("mapping-basic.json", enrichResolution));
}

private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) {
var mapping = loadMapping(mappingFileName);
EsIndex test = new EsIndex("test", mapping, Set.of("test"));
IndexResolution getIndexResult = IndexResolution.valid(test);

return new Analyzer(new AnalyzerContext(config, functionRegistry, getIndexResult, enrichResolution), new Verifier(new Metrics()));
return new Analyzer(
new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution),
new Verifier(new Metrics())
);
}

/**
Expand All @@ -167,7 +156,7 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes
*/
// TODO: this is suboptimal due to eval not being removed/folded
public void testCountAllWithEval() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test | eval s = salary | rename s as sr | eval hidden_s = sr | rename emp_no as e | where e < 10050
| stats c = count(*)
""");
Expand All @@ -186,7 +175,7 @@ public void testCountAllWithEval() {
* limit[],
*/
public void testCountAllWithFilter() {
var plan = plan("from test | where emp_no > 10040 | stats c = count(*)");
var plan = plannerOptimizer.plan("from test | where emp_no > 10040 | stats c = count(*)");
var stat = queryStatsFor(plan);
assertThat(stat.type(), is(StatsType.COUNT));
assertThat(stat.query(), is(nullValue()));
Expand All @@ -206,7 +195,7 @@ public void testCountAllWithFilter() {
* limit[],
*/
public void testCountFieldWithFilter() {
var plan = plan("from test | where emp_no > 10040 | stats c = count(emp_no)", IS_SV_STATS);
var plan = plannerOptimizer.plan("from test | where emp_no > 10040 | stats c = count(emp_no)", IS_SV_STATS);
var stat = queryStatsFor(plan);
assertThat(stat.type(), is(StatsType.COUNT));
assertThat(stat.query(), is(QueryBuilders.existsQuery("emp_no")));
Expand All @@ -224,7 +213,7 @@ public void testCountFieldWithFilter() {
* }
*/
public void testCountFieldWithEval() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test | eval s = salary | rename s as sr | eval hidden_s = sr | rename emp_no as e | where e < 10050
| stats c = count(hidden_s)
""", IS_SV_STATS);
Expand All @@ -242,7 +231,7 @@ public void testCountFieldWithEval() {

// optimized doesn't know yet how to push down count over field
public void testCountOneFieldWithFilter() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where salary > 1000
| stats c = count(salary)
Expand All @@ -267,7 +256,7 @@ public void testCountOneFieldWithFilter() {

// optimized doesn't know yet how to push down count over field
public void testCountOneFieldWithFilterAndLimit() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where salary > 1000
| limit 10
Expand Down Expand Up @@ -334,15 +323,15 @@ private PhysicalPlan planWithMappingAndDocs(String query, String mapping, List<S
}, directoryReader -> {
IndexSearcher searcher = newSearcher(directoryReader);
SearchExecutionContext ctx = createSearchExecutionContext(mapperService, searcher);
plan.set(plan(query, new SearchStats(List.of(ctx))));
plan.set(plannerOptimizer.plan(query, new SearchStats(List.of(ctx))));
});

return plan.get();
}

// optimizer doesn't know yet how to break down different multi count
public void testCountMultipleFieldsWithFilter() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where salary > 1000 and emp_no > 10010
| stats cs = count(salary), ce = count(emp_no)
Expand All @@ -351,7 +340,7 @@ public void testCountMultipleFieldsWithFilter() {
}

public void testAnotherCountAllWithFilter() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where emp_no > 10010
| stats c = count()
Expand All @@ -372,7 +361,7 @@ public void testAnotherCountAllWithFilter() {

// optimizer doesn't know yet how to normalize and deduplicate cout(*), count(), count(1) etc.
public void testMultiCountAllWithFilter() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where emp_no > 10010
| stats c = count(), call = count(*), c_literal = count(1)
Expand All @@ -382,7 +371,7 @@ public void testMultiCountAllWithFilter() {

// optimizer doesn't know yet how to break down different multi count
public void testCountFieldsAndAllWithFilter() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where emp_no > 10010
| stats c = count(), cs = count(salary), ce = count(emp_no)
Expand All @@ -405,7 +394,7 @@ public boolean exists(String field) {
}
};

var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| where emp_no > 10010
| stats c = count()
Expand All @@ -431,7 +420,7 @@ public boolean exists(String field) {
* \_EsQueryExec[test], query[{"exists":{"field":"emp_no","boost":1.0}}][_doc{f}#13], limit[1000], sort[] estimatedRowSize[324]
*/
public void testIsNotNullPushdownFilter() {
var plan = plan("from test | where emp_no is not null");
var plan = plannerOptimizer.plan("from test | where emp_no is not null");

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
Expand All @@ -455,7 +444,7 @@ public void testIsNotNullPushdownFilter() {
* limit[1000], sort[] estimatedRowSize[324]
*/
public void testIsNullPushdownFilter() {
var plan = plan("from test | where emp_no is null");
var plan = plannerOptimizer.plan("from test | where emp_no is null");

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
Expand All @@ -479,7 +468,9 @@ public void testIsNullPushdownFilter() {
*/
public void testIsNotNull_TextField_Pushdown() {
String textField = randomFrom("gender", "job");
var plan = plan(String.format(Locale.ROOT, "from test | where %s is not null | stats count(%s)", textField, textField));
var plan = plannerOptimizer.plan(
String.format(Locale.ROOT, "from test | where %s is not null | stats count(%s)", textField, textField)
);

var limit = as(plan, LimitExec.class);
var finalAgg = as(limit.child(), AggregateExec.class);
Expand All @@ -503,7 +494,7 @@ public void testIsNotNull_TextField_Pushdown() {
*/
public void testIsNull_TextField_Pushdown() {
String textField = randomFrom("gender", "job");
var plan = plan(String.format(Locale.ROOT, "from test | where %s is null", textField, textField));
var plan = plannerOptimizer.plan(String.format(Locale.ROOT, "from test | where %s is null", textField, textField));

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
Expand All @@ -528,7 +519,7 @@ public void testIsNull_TextField_Pushdown() {
* [vector=ConstantBooleanVector[positions=1, value=true]]]]
*/
public void testIsNull_TextField_Pushdown_WithCount() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| eval filtered_job = job, count_job = job
| where filtered_job IS NULL
Expand Down Expand Up @@ -559,7 +550,7 @@ public void testIsNull_TextField_Pushdown_WithCount() {
* }]]], query[{"exists":{"field":"job","boost":1.0}}][count{r}#25, seen{r}#26], limit[],
*/
public void testIsNotNull_TextField_Pushdown_WithCount() {
var plan = plan("""
var plan = plannerOptimizer.plan("""
from test
| eval filtered_job = job, count_job = job
| where filtered_job IS NOT NULL
Expand Down Expand Up @@ -600,7 +591,7 @@ public void testCidrMatchPushdownFilter() {
String cidrMatch = format(null, "cidr_match({}, {})", fieldName, cidrBlocksString);

var query = "from test | where " + cidrMatch;
var plan = plan(query, EsqlTestUtils.TEST_SEARCH_STATS, allTypeMappingAnalyzer);
var plan = plannerOptimizer.plan(query, EsqlTestUtils.TEST_SEARCH_STATS, allTypeMappingAnalyzer);

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
Expand Down Expand Up @@ -705,7 +696,7 @@ public void testOutOfRangeFilterPushdown() {
* \_EsQueryExec[test], query[{"esql_single_value":{"field":"byte","next":{"match_all":{"boost":1.0}},...}}]
*/
private EsQueryExec doTestOutOfRangeFilterPushdown(String query, Analyzer analyzer) {
var plan = plan(query, EsqlTestUtils.TEST_SEARCH_STATS, analyzer);
var plan = plannerOptimizer.plan(query, EsqlTestUtils.TEST_SEARCH_STATS, analyzer);

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
Expand All @@ -729,7 +720,7 @@ private EsQueryExec doTestOutOfRangeFilterPushdown(String query, Analyzer analyz
public void testMissingFieldsDoNotGetExtracted() {
var stats = EsqlTestUtils.statsForMissingField("first_name", "last_name", "emp_no", "salary");

var plan = plan("from test", stats);
var plan = plannerOptimizer.plan("from test", stats);
var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
Expand Down Expand Up @@ -770,45 +761,6 @@ private Stat queryStatsFor(PhysicalPlan plan) {
return stat;
}

private PhysicalPlan plan(String query) {
return plan(query, EsqlTestUtils.TEST_SEARCH_STATS);
}

private PhysicalPlan plan(String query, SearchStats stats) {
return plan(query, stats, analyzer);
}

private PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer) {
var physical = optimizedPlan(physicalPlan(query, analyzer), stats);
return physical;
}

private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) {
// System.out.println("* Physical Before\n" + plan);
var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan));
// System.out.println("* Physical After\n" + physicalPlan);
// the real execution breaks the plan at the exchange and then decouples the plan
// this is of no use in the unit tests, which checks the plan as a whole instead of each
// individually hence why here the plan is kept as is

var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(config, searchStats));
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(config, searchStats), true);
var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer);

// handle local reduction alignment
l = PhysicalPlanOptimizerTests.localRelationshipAlignment(l);

// System.out.println("* Localized DataNode Plan\n" + l);
return l;
}

private PhysicalPlan physicalPlan(String query, Analyzer analyzer) {
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
return physical;
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.stats.SearchStats;

public class TestPlannerOptimizer {
private final EsqlParser parser;
private final Analyzer analyzer;
private final LogicalPlanOptimizer logicalOptimizer;
private final PhysicalPlanOptimizer physicalPlanOptimizer;
private final EsqlFunctionRegistry functionRegistry;
private final Mapper mapper;
private final EsqlConfiguration config;

public TestPlannerOptimizer(EsqlConfiguration config, Analyzer analyzer) {
this.analyzer = analyzer;
this.config = config;

parser = new EsqlParser();
logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config));
physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
functionRegistry = new EsqlFunctionRegistry();
mapper = new Mapper(functionRegistry);
}

public PhysicalPlan plan(String query) {
return plan(query, EsqlTestUtils.TEST_SEARCH_STATS);
}

public PhysicalPlan plan(String query, SearchStats stats) {
return plan(query, stats, analyzer);
}

public PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer) {
var physical = optimizedPlan(physicalPlan(query, analyzer), stats);
return physical;
}

private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) {
// System.out.println("* Physical Before\n" + plan);
var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan));
// System.out.println("* Physical After\n" + physicalPlan);
// the real execution breaks the plan at the exchange and then decouples the plan
// this is of no use in the unit tests, which checks the plan as a whole instead of each
// individually hence why here the plan is kept as is

var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(config, searchStats));
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(config, searchStats), true);
var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer);

// handle local reduction alignment
l = PhysicalPlanOptimizerTests.localRelationshipAlignment(l);

// System.out.println("* Localized DataNode Plan\n" + l);
return l;
}

private PhysicalPlan physicalPlan(String query, Analyzer analyzer) {
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
return physical;
}
}

0 comments on commit 5108f6b

Please sign in to comment.