Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/137678.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137678
summary: "ESQL: Improve field reference tracking in `FORK` command"
area: ES|QL
type: enhancement
issues:
- 137283
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha
// NOTE: the grammar allows wildcards to be used in other commands as well, but these are forbidden in the LogicalPlanBuilder
// Except in KEEP and DROP.
var keepRefs = AttributeSet.builder();
var currentBranchKeepRefs = new Holder<>(AttributeSet.builder());
var dropWildcardRefs = AttributeSet.builder();
// fields required to request for lookup joins to work
var joinRefs = AttributeSet.builder();
Expand All @@ -117,10 +118,17 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha
// Early return from forEachDown. We will iterate over the children manually and end the recursion via forEachDown early.
var forkRefsResult = AttributeSet.builder();
forkRefsResult.addAll(referencesBuilder.get());
var parentKeepRefs = AttributeSet.builder();
parentKeepRefs.addAll(keepRefs);

for (var forkBranch : fork.children()) {
// Reset branch-specific state for each fork branch
currentBranchKeepRefs.set(AttributeSet.builder());
currentBranchKeepRefs.get().addAll(parentKeepRefs);
referencesBuilder.set(AttributeSet.builder());

var isNestedFork = forkBranch.forEachDownMayReturnEarly(forEachDownProcessor.get());

// This assert is just for good measure. FORKs within FORKs is yet not supported.
LogicalPlan lastFork = lastSeenFork.get();
if (lastFork != null && fork instanceof UnionAll == false && lastFork instanceof UnionAll == false) {
Expand All @@ -130,11 +138,24 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha
// TODO consider deferring the nested fork check to Analyzer verifier or LogicalPlanOptimizer verifier.
assert isNestedFork == false : "Nested FORKs are not yet supported";
}
// This is a safety measure for fork where the list of fields returned is empty.
// It can be empty for a branch that does need all the fields. For example "fork (where true) (where a is not null)"
// but it can also be empty for queries where NO fields are needed from ES,
// for example "fork (eval x = 1 | keep x) (eval y = 1 | keep y)" but we cannot establish this yet.
if (referencesBuilder.get().isEmpty()) {

// Determine if this fork branch requires all fields from the index (projectAll = true).
// This happens when a branch has no explicit field selection and no KEEP constraints.
//
// We trigger projectAll when ALL of the following conditions are met:
// 1. No KEEP commands in this branch (currentBranchKeepRefs is empty)
// 2. AND either:
// a) No field references were collected (referencesBuilder is empty), OR
// b) The branch contains no commands that require explicit field collection
// (e.g., no PROJECT or STATS commands that would limit field selection)
//
// Examples:
// - "fork (where true) (where a is not null)" → needs all fields (no KEEP, only filters)
// - "fork (eval x = 1 | keep x) (where true)" → needs all fields (second branch has no KEEP)
// - "fork (eval x = 1 | keep x) (eval y = 2 | keep y)" → specific fields only (both branches have KEEP)
if (currentBranchKeepRefs.get().isEmpty()
&& (referencesBuilder.get().isEmpty()
|| false == forkBranch.anyMatch(forkPlan -> shouldCollectReferencedFields(forkPlan, inlinestatsAggs)))) {
projectAll.set(true);
// Return early, we'll be returning all references no matter what the remainder of the query is.
breakEarly.set(true);
Expand Down Expand Up @@ -192,6 +213,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha
referencesBuilder.get().add(ua);
if (p instanceof Keep) {
keepRefs.add(ua);
currentBranchKeepRefs.get().add(ua);
} else if (p instanceof Drop) {
dropWildcardRefs.add(ua);
} else {
Expand All @@ -200,6 +222,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha
});
if (p instanceof Keep) {
keepRefs.addAll(p.references());
currentBranchKeepRefs.get().addAll(p.references());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testDirectFilter() {
}

public void testForkEval() {
assertFieldNames("FROM employees | fork (eval x = 1 | keep x) (eval y = 2 | keep y) (eval z = 3 | keep z)", Set.of("*"));
assertFieldNames("FROM employees | fork (eval x = 1 | keep x) (eval y = 2 | keep y) (eval z = 3 | keep z)", Set.of("_index"));
}

public void testSort1() {
Expand Down Expand Up @@ -2167,7 +2167,7 @@ public void testForkFieldsWithStatsInOneBranch() {
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(STATS x = count(*), y=min(z))
| WHERE x > y
""", Set.of("_index", "x", "y", "a", "c", "z", "y.*", "x.*", "z.*", "a.*", "c.*"));
""", ALL_FIELDS);
}

public void testForkFieldsWithEnrichAndLookupJoins() {
Expand Down Expand Up @@ -2370,7 +2370,7 @@ public void testFuseWithStats() {
( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3)
| FUSE
| STATS count_fork=COUNT(*) BY _fork
| SORT _fork""", Set.of("_index", "title", "author", "title.*", "author.*"));
| SORT _fork""", ALL_FIELDS);
}

public void testFuseWithMultipleForkBranches() {
Expand Down Expand Up @@ -2659,19 +2659,16 @@ public void testForkBranchWithLookupJoin() {
}

public void testForkBeforeStats() {
assertFieldNames(
"""
FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS c = count(*), m = max(_fork)""",
Set.of("_index", "first_name", "emp_no", "last_name", "last_name.*", "first_name.*", "emp_no.*")
);
assertFieldNames("""
FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS c = count(*), m = max(_fork)""", ALL_FIELDS);
}

public void testForkBeforeStatsWithWhere() {
Expand All @@ -2685,7 +2682,7 @@ public void testForkBeforeStatsWithWhere() {
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS a = count(*) WHERE _fork == "fork1",
b = max(_fork)""", Set.of("_index", "first_name", "emp_no", "last_name", "last_name.*", "first_name.*", "emp_no.*"));
b = max(_fork)""", ALL_FIELDS);
}

public void testForkBeforeStatsByWithWhere() {
Expand All @@ -2700,7 +2697,7 @@ public void testForkBeforeStatsByWithWhere() {
( EVAL x = "abc" | EVAL y = "aaa" )
| STATS a = count(*) WHERE emp_no > 10000,
b = max(x) WHERE _fork == "fork1" BY _fork
| SORT _fork""", Set.of("_index", "emp_no", "x", "first_name", "last_name", "last_name.*", "x.*", "first_name.*", "emp_no.*"));
| SORT _fork""", ALL_FIELDS);
}

public void testForkAfterDrop() {
Expand Down Expand Up @@ -2738,7 +2735,20 @@ public void testForkBranchWithKeep() {
FROM languages
| FORK ( WHERE language_name == "English" | KEEP language_name, language_code )
( WHERE language_name != "English" )
| SORT _fork, language_name""", Set.of("_index", "language_name", "language_code", "language_code.*", "language_name.*"));
| SORT _fork, language_name""", ALL_FIELDS);
}

public void testForkBranchWithKeep2() {
assertFieldNames("FROM employees | fork (eval x = 1 | keep x) (eval y = 2 | keep y) (eval z = 3)", IndexResolver.ALL_FIELDS);
}

public void testForkBranchWithKeep3() {
assertFieldNames("""
FROM employees
| FORK (EVAL x = 1 | KEEP x)
(EVAL y = 2 | KEEP y)
(WHERE emp_no > 10000)
""", ALL_FIELDS);
}

public void testForkBeforeRename() {
Expand Down Expand Up @@ -3144,33 +3154,78 @@ public void testStatsChainingWithTimestampCarriedForwardAsByKey() {

public void testSubqueryInFrom() {
assumeTrue("Requires subquery in FROM command support", EsqlCapabilities.Cap.SUBQUERY_IN_FROM_COMMAND.isEnabled());
// TODO improve FieldNameUtils to process subqueries better, so that we don't call field-caps with "*"
assertFieldNames("""
FROM employees, (FROM books | WHERE author:"Faulkner" | KEEP title, author | SORT title | LIMIT 5)
| WHERE emp_no == 10000 OR author IS NOT NULL
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""", Set.of("*"));
assertFieldNames(
"""
FROM employees, (FROM books | WHERE author:"Faulkner" | KEEP title, author | SORT title | LIMIT 5)
| WHERE emp_no == 10000 OR author IS NOT NULL
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""",
Set.of(
"title.*",
"last_name.*",
"_index",
"emp_no",
"author",
"first_name.*",
"last_name",
"title",
"author.*",
"first_name",
"emp_no.*"
)
);
}

public void testSubqueryInFromWithFork() {
assumeTrue("Requires subquery in FROM command support", EsqlCapabilities.Cap.SUBQUERY_IN_FROM_COMMAND.isEnabled());
// nested fork may trigger assertion in FieldNameUtils, defer the check of nested subqueries or subquery with fork
// to logical plan optimizer.
// TODO Improve FieldNameUtils to process subqueries better, , so that we don't call field-caps with "*"
assertFieldNames("""
FROM employees, (FROM books | FORK (WHERE author:"Faulkner") (WHERE title:"Ring") | KEEP title, author | SORT title | LIMIT 5)
| WHERE emp_no == 10000 OR author IS NOT NULL
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""", Set.of("*"));
assertFieldNames(
"""
FROM employees,
(FROM books | FORK (WHERE author:"Faulkner")
(WHERE title:"Ring") | KEEP title, author | SORT title | LIMIT 5)
| WHERE emp_no == 10000 OR author IS NOT NULL
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""",
Set.of(
"title.*",
"last_name.*",
"_index",
"emp_no",
"author",
"first_name.*",
"last_name",
"title",
"author.*",
"first_name",
"emp_no.*"
)
);

assertFieldNames("""
FROM books, (FROM employees | WHERE emp_no == 10000)
| FORK (WHERE author:"Faulkner") (WHERE title:"Ring")
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""", Set.of("*"));
assertFieldNames(
"""
FROM books, (FROM employees | WHERE emp_no == 10000)
| FORK (WHERE author:"Faulkner") (WHERE title:"Ring")
| KEEP emp_no, first_name, last_name, author, title
| SORT emp_no, author
""",
Set.of(
"title.*",
"last_name.*",
"_index",
"emp_no",
"author",
"first_name.*",
"last_name",
"title",
"author.*",
"first_name",
"emp_no.*"
)
);
}

private void assertFieldNames(String query, Set<String> expected) {
Expand Down