Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import org.elasticsearch.xpack.esql.core.expression.function.Function;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.promql.selector.LabelMatcher;
Expand Down Expand Up @@ -163,9 +161,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (super.equals(o)) {
VectorBinaryOperator that = (VectorBinaryOperator) o;
return dropMetricName == that.dropMetricName
&& Objects.equals(match, that.match)
&& Objects.equals(binaryOp, that.binaryOp);
return dropMetricName == that.dropMetricName && Objects.equals(match, that.match) && Objects.equals(binaryOp, that.binaryOp);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ public ScalarFunctionFactory asFunction() {
private final ComparisonOp op;
private final boolean boolMode;

public VectorBinaryComparison(Source source, LogicalPlan left, LogicalPlan right, VectorMatch match, boolean boolMode, ComparisonOp op) {
public VectorBinaryComparison(
Source source,
LogicalPlan left,
LogicalPlan right,
VectorMatch match,
boolean boolMode,
ComparisonOp op
) {
super(source, left, right, match, boolMode == false, op);
this.op = op;
this.boolMode = boolMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlCommand;
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlParams;
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
import org.joni.exception.SyntaxException;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -117,6 +121,9 @@
*/
public class LogicalPlanBuilder extends ExpressionBuilder {

private static final String TIME = "time", START = "start", END = "end", STEP = "step";
private static final Set<String> PROMQL_ALLOWED_PARAMS = Set.of(TIME, START, END, STEP);

/**
* Maximum number of commands allowed per query
*/
Expand Down Expand Up @@ -1219,52 +1226,7 @@ public PlanFactory visitSampleCommand(EsqlBaseParser.SampleCommandContext ctx) {
@Override
public PlanFactory visitPromqlCommand(EsqlBaseParser.PromqlCommandContext ctx) {
Source source = source(ctx);
Map<String, Expression> params = new HashMap<>();
String TIME = "time", START = "start", END = "end", STEP = "step";
Set<String> allowed = Set.of(TIME, START, END, STEP);

if (ctx.promqlParam().isEmpty()) {
throw new ParsingException(source(ctx), "Parameter [{}] or [{}] is required", STEP, TIME);
}

for (EsqlBaseParser.PromqlParamContext paramCtx : ctx.promqlParam()) {
var paramNameCtx = paramCtx.name;
String name = paramNameCtx.getText();
if (params.containsKey(name)) {
throw new ParsingException(source(paramNameCtx), "[{}] already specified", name);
}
if (allowed.contains(name) == false) {
String message = "Unknown parameter [{}]";
List<String> similar = StringUtils.findSimilar(name, allowed);
if (CollectionUtils.isEmpty(similar) == false) {
message += ", did you mean " + (similar.size() == 1 ? "[" + similar.get(0) + "]" : "any of " + similar) + "?";
}
throw new ParsingException(source(paramNameCtx), message, name);
}
String value = paramCtx.value.getText();
// TODO: validate and convert the value

}

// Validation logic for time parameters
Expression time = params.get(TIME);
Expression start = params.get(START);
Expression end = params.get(END);
Expression step = params.get(STEP);

if (time != null && (start != null || end != null || step != null)) {
throw new ParsingException(
source,
"Specify either [{}] for instant query or [{}}], [{}] or [{}}] for a range query",
TIME,
STEP,
START,
END
);
}
if ((start != null || end != null) && step == null) {
throw new ParsingException(source, "[{}}] is required alongside [{}}] or [{}}]", STEP, START, END);
}
PromqlParams params = parsePromqlParams(ctx, source);

// TODO: Perform type and value validation
var queryCtx = ctx.promqlQueryPart();
Expand Down Expand Up @@ -1292,9 +1254,95 @@ public PlanFactory visitPromqlCommand(EsqlBaseParser.PromqlCommandContext ctx) {
throw PromqlParserUtils.adjustParsingException(pe, promqlStartLine, promqlStartColumn);
}

return plan -> time != null
? new PromqlCommand(source, plan, promqlPlan, time)
: new PromqlCommand(source, plan, promqlPlan, start, end, step);
return plan -> new PromqlCommand(source, plan, promqlPlan, params);
}

private static PromqlParams parsePromqlParams(EsqlBaseParser.PromqlCommandContext ctx, Source source) {
Instant time = null;
Instant start = null;
Instant end = null;
Duration step = null;

Set<String> paramsSeen = new HashSet<>();
for (EsqlBaseParser.PromqlParamContext paramCtx : ctx.promqlParam()) {
String name = param(paramCtx.name);
if (paramsSeen.add(name) == false) {
throw new ParsingException(source(paramCtx.name), "[{}] already specified", name);
}
Source valueSource = source(paramCtx.value);
String valueString = param(paramCtx.value);
switch (name) {
case TIME -> time = PromqlParserUtils.parseDate(valueSource, valueString);
case START -> start = PromqlParserUtils.parseDate(valueSource, valueString);
case END -> end = PromqlParserUtils.parseDate(valueSource, valueString);
case STEP -> {
try {
step = Duration.ofSeconds(Integer.parseInt(valueString));
} catch (NumberFormatException ignore) {
step = PromqlParserUtils.parseDuration(valueSource, valueString);
}
}
default -> {
String message = "Unknown parameter [{}]";
List<String> similar = StringUtils.findSimilar(name, PROMQL_ALLOWED_PARAMS);
if (CollectionUtils.isEmpty(similar) == false) {
message += ", did you mean " + (similar.size() == 1 ? "[" + similar.get(0) + "]" : "any of " + similar) + "?";
}
throw new ParsingException(source(paramCtx.name), message, name);
}
}
}

// Validation logic for time parameters
if (time != null) {
if (start != null || end != null || step != null) {
throw new ParsingException(
source,
"Specify either [{}] for instant query or [{}}], [{}] or [{}}] for a range query",
TIME,
STEP,
START,
END
);
}
} else if (step != null) {
if (start != null || end != null) {
if (start == null || end == null) {
throw new ParsingException(
source,
"Parameters [{}] and [{}] must either both be specified or both be omitted for a range query",
START,
END
);
}
if (end.isBefore(start)) {
throw new ParsingException(
source,
"invalid parameter \"end\": end timestamp must not be before start time",
end,
start
);
}
}
if (step.isPositive() == false) {
throw new ParsingException(
source,
"invalid parameter \"step\": zero or negative query resolution step widths are not accepted. "
+ "Try a positive integer",
step
);
}
} else {
throw new ParsingException(source, "Parameter [{}] or [{}] is required", STEP, TIME);
}
return new PromqlParams(time, start, end, step);
}

private static String param(EsqlBaseParser.PromqlParamContentContext paramCtx) {
if (paramCtx.QUOTED_IDENTIFIER() != null) {
return AbstractBuilder.unquote(paramCtx.QUOTED_IDENTIFIER().getText());
} else {
return paramCtx.getText();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,7 @@ public Duration visitDuration(DurationContext ctx) {
}

// Non-literal LogicalPlan
throw new ParsingException(
source(ctx),
"Duration must be a constant expression"
);
throw new ParsingException(source(ctx), "Duration must be a constant expression");
}
case Expression e -> {
// Fallback for Expression (shouldn't happen with new logic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ private static Duration arithmetics(Source source, Duration left, Duration right
Duration result = switch (op) {
case ADD -> left.plus(right);
case SUB -> left.minus(right);
default -> throw new ParsingException(
source,
"Operation [{}] not supported between two durations",
op
);
default -> throw new ParsingException(source, "Operation [{}] not supported between two durations", op);
};

return result;
Expand Down Expand Up @@ -120,11 +116,7 @@ private static Duration arithmetics(Source source, Number scalar, Duration durat
case ADD -> arithmetics(source, duration, scalar, ArithmeticOp.ADD);
case SUB -> arithmetics(source, Duration.ofSeconds(scalar.longValue()), duration, ArithmeticOp.SUB);
case MUL -> arithmetics(source, duration, scalar, ArithmeticOp.MUL);
default -> throw new ParsingException(
source,
"Operation [{}] not supported with scalar on left and duration on right",
op
);
default -> throw new ParsingException(source, "Operation [{}] not supported with scalar on left and duration on right", op);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.xpack.esql.expression.promql.predicate.operator.comparison.VectorBinaryComparison.ComparisonOp;
import org.elasticsearch.xpack.esql.expression.promql.predicate.operator.set.VectorBinarySet;
import org.elasticsearch.xpack.esql.expression.promql.subquery.Subquery;
import org.elasticsearch.xpack.esql.parser.EsqlBaseParser;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.parser.PromqlBaseParser;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand Down Expand Up @@ -296,7 +295,7 @@ public LogicalPlan visitArithmeticBinary(PromqlBaseParser.ArithmeticBinaryContex
Object leftValue = leftLiteral.value();
Object rightValue = rightLiteral.value();

// arithmetics
// arithmetics
if (binaryOperator instanceof ArithmeticOp arithmeticOp) {
Object result = PromqlFoldingUtils.evaluate(source, leftValue, rightValue, arithmeticOp);
DataType resultType = determineResultType(result);
Expand Down Expand Up @@ -356,20 +355,12 @@ public LogicalPlan visitArithmeticBinary(PromqlBaseParser.ArithmeticBinaryContex

return switch (binaryOperator) {
case ArithmeticOp arithmeticOp -> new VectorBinaryArithmetic(source, le, re, modifier, arithmeticOp);
case ComparisonOp comparisonOp -> new VectorBinaryComparison(
source,
le,
re,
modifier,
bool,
comparisonOp
);
case ComparisonOp comparisonOp -> new VectorBinaryComparison(source, le, re, modifier, bool, comparisonOp);
case VectorBinarySet.SetOp setOp -> new VectorBinarySet(source, le, re, modifier, setOp);
default -> throw new ParsingException(source(ctx.op), "Unknown arithmetic {}", opText);
};
}


private BinaryOp binaryOp(Token opType) {
return switch (opType.getType()) {
case CARET -> ArithmeticOp.POW;
Expand Down Expand Up @@ -548,8 +539,7 @@ public Duration visitSubqueryResolution(PromqlBaseParser.SubqueryResolutionConte
return duration;
}

throw new ParsingException(source(ctx), "Expected duration result, got [{}]",
result.getClass().getSimpleName());
throw new ParsingException(source(ctx), "Expected duration result, got [{}]", result.getClass().getSimpleName());
}

// Just COLON with no resolution - use default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;

import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.LinkedHashMap;
import java.util.Map;

Expand Down Expand Up @@ -272,4 +274,20 @@ private static int adjustColumn(int lineNumber, int columnNumber, int startColum
// the column offset only applies to the first line of the PROMQL command
return lineNumber == 1 ? columnNumber + startColumn - 1 : columnNumber;
}

/*
* Parses a Prometheus date which can be either a float representing epoch seconds or an RFC3339 date string.
*/
public static Instant parseDate(Source source, String value) {
try {
return Instant.ofEpochMilli((long) (Double.parseDouble(value) * 1000));
} catch (NumberFormatException ignore) {
// Not a float, try parsing as date string
}
try {
return Instant.parse(value);
} catch (DateTimeParseException e) {
throw new ParsingException(source, "Invalid date format [{}]", value);
}
}
}
Loading