Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.elasticsearch.xpack.esql.expression.predicate.logical.And;
import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals;
import org.elasticsearch.xpack.esql.expression.promql.function.PromqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
Expand Down Expand Up @@ -98,16 +100,28 @@ protected LogicalPlan rule(PromqlCommand promqlCommand) {
LogicalPlan promqlPlan = promqlCommand.promqlPlan();

// first replace the Placeholder relation with the child plan
promqlPlan = promqlPlan.transformUp(PlaceholderRelation.class, pr -> promqlCommand.child());
promqlPlan = promqlPlan.transformUp(PlaceholderRelation.class, pr -> withTimestampFilter(promqlCommand, promqlCommand.child()));

// Translate based on plan type
return translate(promqlCommand, promqlPlan);
// Translate based on plan type by converting the plan bottom-up
return map(promqlCommand, promqlPlan).plan();
}

private LogicalPlan translate(PromqlCommand promqlCommand, LogicalPlan promqlPlan) {
// convert the plan bottom-up
MapResult result = map(promqlCommand, promqlPlan);
return result.plan();
private static LogicalPlan withTimestampFilter(PromqlCommand promqlCommand, LogicalPlan plan) {
// start and end are either both set or both null
if (promqlCommand.start().value() != null && promqlCommand.end().value() != null) {
Source promqlSource = promqlCommand.source();
Expression timestamp = promqlCommand.timestamp();
plan = new Filter(
promqlSource,
plan,
new And(
promqlSource,
new GreaterThanOrEqual(promqlSource, timestamp, promqlCommand.start()),
new LessThanOrEqual(promqlSource, timestamp, promqlCommand.end())
)
);
}
return plan;
}

private record MapResult(LogicalPlan plan, Map<String, Expression> extras) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.predicate.regex.RegexMatch;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -41,7 +42,6 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -57,8 +57,6 @@
// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug tests")
public class PromqlLogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests {

private static final String PARAM_FORMATTING = "%1$s";

private static Analyzer tsAnalyzer;

@BeforeClass
Expand Down Expand Up @@ -93,7 +91,6 @@ public void testExplainPromql() {
)
""");

logger.trace(plan);
}

public void testExplainPromqlSimple() {
Expand All @@ -106,7 +103,6 @@ public void testExplainPromqlSimple() {
| STATS AVG(AVG_OVER_TIME(network.bytes_in)) BY TBUCKET(1h)
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -144,8 +140,6 @@ public void testAvgAvgOverTimeOutput() {
| LIMIT 1000
""");

logger.trace(plan);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(3));

Expand Down Expand Up @@ -240,7 +234,6 @@ public void testTSAvgAvgOverTimeOutput() {
| LIMIT 1000
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -270,9 +263,6 @@ public void testTSAvgWithoutByDimension() {
| STATS avg(avg_over_time(network.bytes_in)) BY TBUCKET(1h)
| LIMIT 1000
""");

logger.trace(plan);

}

/**
Expand Down Expand Up @@ -306,7 +296,6 @@ public void testPromqlAvgWithoutByDimension() {
| LIMIT 1000
""");

logger.trace(plan);
}

/**
Expand Down Expand Up @@ -337,7 +326,6 @@ public void testRangeSelector() {
| promql step 1h ( max by (pod) (avg_over_time(network.bytes_in[1h])) )
""");

logger.trace(plan);
}

@AwaitsFix(bugUrl = "Invalid call to dataType on an unresolved object ?RATE_$1")
Expand All @@ -353,7 +341,6 @@ avg by (pod) (rate(network.bytes_in[1h]))
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

/**
Expand Down Expand Up @@ -385,8 +372,10 @@ public void testStartEndStep() {
""";

var plan = planPromql(testQuery);
List<LogicalPlan> collect = plan.collect(Bucket.class::isInstance);
logger.trace(plan);
var filters = plan.collect(Filter.class::isInstance);
assertThat(filters, hasSize(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one filter with multiple conditions (metric not null, timestamp >= start, timestamp <= end)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted the assertion in 6f88cf7 to check that there are two conditions for @timestamp.

var filter = (Filter) filters.getFirst();
assertThat(filter.condition().collect(e -> e instanceof FieldAttribute a && a.name().equals("@timestamp")), hasSize(2));
}

/**
Expand Down Expand Up @@ -427,7 +416,6 @@ max by (pod) (avg_over_time(network.bytes_in{pod=~"host-0|host-1|host-2"}[5m]))
assertThat(filters, hasSize(1));
var filter = (Filter) filters.getFirst();
assertThat(filter.condition().anyMatch(In.class::isInstance), equalTo(true));
logger.trace(plan);
}

public void testLabelSelectorPrefix() {
Expand All @@ -448,7 +436,6 @@ avg by (pod) (avg_over_time(network.bytes_in{pod=~"host-.*"}[5m]))
var filter = (Filter) filters.getFirst();
assertThat(filter.condition().anyMatch(StartsWith.class::isInstance), equalTo(true));
assertThat(filter.condition().anyMatch(NotEquals.class::isInstance), equalTo(false));
logger.trace(plan);
}

public void testLabelSelectorProperPrefix() {
Expand Down Expand Up @@ -518,7 +505,6 @@ sum by (host.name, mountpoint) (last_over_time(system.filesystem.usage{state=~"u
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

@AwaitsFix(bugUrl = "only aggregations across timeseries are supported at this time (found [foo or bar])")
Expand All @@ -539,7 +525,6 @@ public void testGrammar() {
""";

var plan = planPromql(testQuery);
logger.trace(plan);
}

// public void testPromqlArithmetricOperators() {
Expand Down Expand Up @@ -567,9 +552,9 @@ protected LogicalPlan planPromql(String query) {
query = query.replace("$now-1h", '"' + Instant.now().minus(1, ChronoUnit.HOURS).toString() + '"');
query = query.replace("$now", '"' + Instant.now().toString() + '"');
var analyzed = tsAnalyzer.analyze(parser.createStatement(query));
logger.trace(analyzed);
logger.trace("analyzed plan:\n{}", analyzed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove? Or set to debug? Same below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the trace level is fine to be able to debug/inspect the plan during testing. I've removed the redundant logger.trace(plan) in all of the other tests as we're doing that in here already.

var optimized = logicalOptimizer.optimize(analyzed);
logger.trace(optimized);
logger.trace("optimized plan:\n{}", optimized);
return optimized;
}
}