Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,40 @@ public void testLengthNotPushedToText() throws IOException {
);
}

public void testVCosine() throws IOException {
test(
justType("dense_vector"),
b -> b.startArray("test").value(128).value(128).value(0).endArray(),
"| EVAL test = V_COSINE(test, [0, 255, 255])",
matchesList().item(0.5),
matchesMap().entry("test:column_at_a_time:FloatDenseVectorFromDocValues.Normalized.V_COSINE", 1)
);
}

public void testVHammingToByte() throws IOException {
test(
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "byte").endObject(),
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
matchesList().item(6.0),
matchesMap().entry("test:column_at_a_time:ByteDenseVectorFromDocValues.V_HAMMING", 1)
);
}

public void testVHammingToBit() throws IOException {
test(
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "bit").endObject(),
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
matchesList().item(6.0),
matchesMap().entry("test:column_at_a_time:BitDenseVectorFromDocValues.V_HAMMING", 1)
);
}

//
// Tests for more complex shapes.
//

/**
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
*/
Expand All @@ -120,8 +154,16 @@ public void testLengthNotPushedToLookupJoinKeyword() throws IOException {
| EVAL test = LENGTH(test)
""",
matchesList().item(1),
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1), //
sig -> {}
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
sig -> assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
.item("LookupOperator")
.item("EvalOperator") // this one just renames the field
.item("AggregationOperator")
.item("ExchangeSinkOperator")
)
);
}

Expand All @@ -131,7 +173,6 @@ public void testLengthNotPushedToLookupJoinKeyword() throws IOException {
* querying it.
*/
public void testLengthNotPushedToLookupJoinKeywordSameName() throws IOException {
assumeFalse("fix in 137679 - we push to the index but that's just wrong!", true);
String value = "v".repeat(between(0, 256));
initLookupIndex();
test(b -> {
Expand All @@ -144,40 +185,197 @@ public void testLengthNotPushedToLookupJoinKeywordSameName() throws IOException
| LOOKUP JOIN lookup ON matching == main_matching
| EVAL test = LENGTH(test)
""",
matchesList().item(1), // <--- This is incorrectly returning value.length()
matchesList().item(1),
matchesMap().entry("main_matching:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
// ^^^^ This is incorrectly returning test:column_at_a_time:Utf8CodePointsFromOrds.Singleton
sig -> {}
sig -> assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
.item("LookupOperator")
.item("EvalOperator") // this one just renames the field
.item("AggregationOperator")
.item("ExchangeSinkOperator")
)
);
}

public void testVCosine() throws IOException {
/**
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
*/
public void testLengthPushedInsideInlineStats() throws IOException {
String value = "v".repeat(between(0, 256));
test(
justType("dense_vector"),
b -> b.startArray("test").value(128).value(128).value(0).endArray(),
"| EVAL test = V_COSINE(test, [0, 255, 255])",
matchesList().item(0.5),
matchesMap().entry("test:column_at_a_time:FloatDenseVectorFromDocValues.Normalized.V_COSINE", 1)
justType("keyword"),
b -> b.field("test", value),
"""
| INLINE STATS max_length = MAX(LENGTH(test))
| EVAL test = LENGTH(test)
| WHERE test == max_length
""",
matchesList().item(value.length()),
matchesMap().entry("test:column_at_a_time:Utf8CodePointsFromOrds.Singleton", 1),
sig -> {
// There are two data node plans, one for each phase.
if (sig.contains("FilterOperator")) {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
.item("FilterOperator")
.item("EvalOperator") // this one just renames the field
.item("AggregationOperator")
.item("ExchangeSinkOperator")
);
} else {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
.item("EvalOperator") // this one just renames the field
.item("AggregationOperator")
.item("ExchangeSinkOperator")
);
}
}
);
}

public void testVHammingToByte() throws IOException {
/**
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
*/
public void testLengthNotPushedToInlineStatsResults() throws IOException {
String value = "v".repeat(between(0, 256));
test(justType("keyword"), b -> b.field("test", value), """
| INLINE STATS test2 = VALUES(test)
| EVAL test = LENGTH(test2)
""", matchesList().item(value.length()), matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1), sig -> {
// There are two data node plans, one for each phase.
if (sig.contains("EvalOperator")) {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("EvalOperator") // The second phase of the INLINE STATS
.item("AggregationOperator")
.item("ExchangeSinkOperator")
);
} else {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("AggregationOperator")
.item("ExchangeSinkOperator")
);
}
});
}

/**
* Tests {@code LENGTH} on a field that comes from a {@code LOOKUP JOIN}.
*/
public void testLengthNotPushedToGroupedInlineStatsResults() throws IOException {
String value = "v".repeat(between(0, 256));
CheckedConsumer<XContentBuilder, IOException> mapping = b -> {
b.startObject("test").field("type", "keyword").endObject();
b.startObject("group").field("type", "keyword").endObject();
};
test(mapping, b -> b.field("test", value).field("group", "g"), """
| INLINE STATS test2 = VALUES(test) BY group
| EVAL test = LENGTH(test2)
""", matchesList().item(value.length()), matchesMap().extraOk(), sig -> {
// There are two data node plans, one for each phase.
if (sig.contains("EvalOperator")) {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("RowInTableLookup")
.item("ColumnLoad")
.item("ProjectOperator")
.item("EvalOperator")
.item("AggregationOperator")
.item("ExchangeSinkOperator")
);
} else {
assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("HashAggregationOperator")
.item("ExchangeSinkOperator")
);
}
});
}

/**
* LENGTH not pushed when on a fork branch.
*/
public void testLengthNotPushedToFork() throws IOException {
String value = "v".repeat(between(0, 256));
test(
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "byte").endObject(),
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
matchesList().item(6.0),
matchesMap().entry("test:column_at_a_time:ByteDenseVectorFromDocValues.V_HAMMING", 1)
justType("keyword"),
b -> b.field("test", value),
"""
| FORK
(EVAL test = LENGTH(test) + 1)
(EVAL test = LENGTH(test) + 2)
""",
matchesList().item(List.of(value.length() + 1, value.length() + 2)),
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
sig -> assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("ProjectOperator")
.item("ExchangeSinkOperator")
)
);
}

public void testVHammingToBit() throws IOException {
public void testLengthNotPushedBeforeFork() throws IOException {
String value = "v".repeat(between(0, 256));
test(
b -> b.startObject("test").field("type", "dense_vector").field("element_type", "bit").endObject(),
b -> b.startArray("test").value(100).value(100).value(0).endArray(),
"| EVAL test = V_HAMMING(test, [0, 100, 100])",
matchesList().item(6.0),
matchesMap().entry("test:column_at_a_time:BitDenseVectorFromDocValues.V_HAMMING", 1)
justType("keyword"),
b -> b.field("test", value),
"""
| EVAL test = LENGTH(test)
| FORK
(EVAL j = 1)
(EVAL j = 2)
""",
matchesList().item(value.length()),
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
sig -> assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("ProjectOperator")
.item("ExchangeSinkOperator")
)
);
}

public void testLengthNotPushedAfterFork() throws IOException {
String value = "v".repeat(between(0, 256));
test(
justType("keyword"),
b -> b.field("test", value),
"""
| FORK
(EVAL j = 1)
(EVAL j = 2)
| EVAL test = LENGTH(test)
""",
matchesList().item(value.length()),
matchesMap().entry("test:column_at_a_time:BytesRefsFromOrds.Singleton", 1),
sig -> assertMap(
sig,
matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("ProjectOperator")
.item("ExchangeSinkOperator")
)
);
}

Expand Down Expand Up @@ -217,7 +415,7 @@ private void test(
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query("""
FROM test
""" + eval + """
| STATS test = VALUES(test)
| STATS test = MV_SORT(VALUES(test))
""");
/*
* TODO if you just do KEEP test then the load is in the data node reduce driver and not merged:
Expand Down Expand Up @@ -265,6 +463,9 @@ private void test(
}
case "node_reduce" -> logger.info("node_reduce {}", sig);
case "final" -> logger.info("final {}", sig);
case "main.final" -> logger.info("main final {}", sig);
case "subplan-0.final" -> logger.info("subplan-0 final {}", sig);
case "subplan-1.final" -> logger.info("subplan-1 final {}", sig);
default -> throw new IllegalArgumentException("can't match " + description);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public void registerException(Class<? extends Exception> exceptionClass, String
}
delegate.registerException(exceptionClass, message);
}

@Override
public String toString() {
return "warnings for " + source;
}
}
Loading