From bfbe53843d265b8c1c4f7d84decd574b6465865a Mon Sep 17 00:00:00 2001 From: mspruc Date: Fri, 25 Apr 2025 09:45:42 +0200 Subject: [PATCH 1/2] add fetch, offset & limit for java platforms in sql-api --- .../calcite/converter/WayangSortVisitor.java | 27 ++++++++--- .../converter/functions/SortFilter.java | 47 +++++++++++++++++++ .../api/sql/calcite/rules/WayangSortRule.java | 4 +- .../wayang/api/sql/SqlToWayangRelTest.java | 2 +- 4 files changed, 70 insertions(+), 10 deletions(-) create mode 100644 wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java index 5176f24b2..b6df413b5 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java @@ -22,12 +22,14 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; - +import org.apache.wayang.api.sql.calcite.converter.functions.SortFilter; import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor; import org.apache.wayang.api.sql.calcite.rel.WayangSort; import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.FilterOperator; import org.apache.wayang.basic.operators.SortOperator; import org.apache.wayang.core.function.TransformationDescriptor; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -45,12 +47,14 @@ Operator visit(final WayangSort wayangRelNode) { final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput()); - //TODO: implement fetch & offset for java - final RexNode fetch = wayangRelNode.fetch; - final RexLiteral offset = (RexLiteral) wayangRelNode.offset; + // TODO: implement fetch & offset for java + final RexLiteral fetch = (RexLiteral) wayangRelNode.fetch; + final RexInputRef offset = (RexInputRef) wayangRelNode.offset; + + // if (fetch != null || offset != null) throw new + // UnsupportedOperationException("Offset and fetch currently not supported, + // these appear via LIMIT statements in SQL"); - if (fetch != null || offset != null) throw new UnsupportedOperationException("Offset and fetch currently not supported, these appear via LIMIT statements in SQL"); - final RelCollation collation = wayangRelNode.getCollation(); final List collationDirections = collation.getFieldCollations().stream() @@ -71,7 +75,16 @@ Operator visit(final WayangSort wayangRelNode) { childOp.connectTo(0, sort, 0); - return sort; + + final SortFilter sortFilter = new SortFilter( + fetch != null ? RexLiteral.intValue(fetch) : Integer.MAX_VALUE, + offset != null ? RexLiteral.intValue(offset) : 0); + + final FilterOperator filter = new FilterOperator(sortFilter, Record.class); + + sort.connectTo(0, filter, 0); + + return filter; } } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java new file mode 100644 index 000000000..3ecd1a24b --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.api.sql.calcite.converter.functions; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.function.FunctionDescriptor; + +public class SortFilter implements FunctionDescriptor.SerializablePredicate { + final int offset; + final int fetch; + int increment; + + /** + * The filter for a calcite/sql sort operator + * usually triggered by "LIMIT x", "OFFSET x", "FETCH x" statements + * @param offset amount of records ignored before accepting + * @param fetch amount of records accepted + */ + public SortFilter(final int fetch, final int offset) { + this.fetch = fetch; + this.offset = offset; + } + + @Override + public boolean test(final Record record) { + final boolean test = increment >= offset && increment <= fetch; + increment++; + + return test; + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java index 7011b82fe..b7bfcf428 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/rules/WayangSortRule.java @@ -54,7 +54,7 @@ public RelNode convert(final RelNode rel) { sort.getHints(), newInput, sort.collation, - sort.fetch, - sort.offset); + sort.offset, + sort.fetch); } } diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 37a93a305..daa35f30a 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -328,7 +328,7 @@ public void filterWithLike() throws Exception { assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2")))); } - //@Test + @Test public void javaLimit() throws Exception { final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); From cd547188aed310a458bcd9a2f4b7c388c68c23cd Mon Sep 17 00:00:00 2001 From: mspruc Date: Fri, 25 Apr 2025 09:46:56 +0200 Subject: [PATCH 2/2] rm unused import --- .../wayang/api/sql/calcite/converter/WayangSortVisitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java index b6df413b5..fec635f47 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangSortVisitor.java @@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; + import org.apache.wayang.api.sql.calcite.converter.functions.SortFilter; import org.apache.wayang.api.sql.calcite.converter.functions.SortKeyExtractor; import org.apache.wayang.api.sql.calcite.rel.WayangSort;