diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java index 788feacec1c..b79f3176fd3 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/DBSPCompiler.java @@ -98,6 +98,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.regex.Pattern; @@ -750,6 +751,12 @@ void renameIdentifiers(List statements) { static final Pattern ITEM_ERROR = Pattern.compile("Cannot apply 'ITEM' to arguments of type 'ITEM\\(([^,]+), ([^']+)\\)'(.*)", Pattern.DOTALL); + static SourcePositionRange getRange(CalciteContextException e) { + return new SourcePositionRange( + new SourcePosition(e.getPosLine(), e.getPosColumn()), + new SourcePosition(e.getEndPosLine(), e.getEndPosColumn())); + } + /** Rewrite the error message for some Calcite errors which are confusing */ private CompilationError improveErrorMessage(CalciteContextException e) { String message = e.getMessage(); @@ -760,11 +767,19 @@ private CompilationError improveErrorMessage(CalciteContextException e) { String index = matcher.group(2); String tail = matcher.group(3); String newMessage = "Cannot apply indexing to arguments of type " + source + "[" + index + "]" + tail; - return new CompilationError( - newMessage, - new SourcePositionRange( - new SourcePosition(e.getPosLine(), e.getPosColumn()), - new SourcePosition(e.getEndPosLine(), e.getEndPosColumn()))); + return new CompilationError(newMessage, getRange(e)); + } + if (message.contains("'TIMESTAMPDIFF'.")) { + // The Calcite parser replaces DATEDIFF with TIMESTAMPDIFF + // Try to figure out whether this is what happened and rewrite it. + // This heuristic may fail... but will work in general. + SourcePositionRange range = getRange(e); + String fragment = this.sources.getFragment(range, false); + String lower = fragment.toLowerCase(Locale.ENGLISH); + if (lower.contains("datediff") && !lower.contains("timestampdiff")) { + message = message.replace("TIMESTAMPDIFF", "DATEDIFF"); + return new CompilationError(message, range); + } } } return new CompilationError(e); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java index 6e68c7685ef..eeda8d4ffa2 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/CalciteToDBSPCompiler.java @@ -2661,21 +2661,21 @@ void visitWindow(LogicalWindow window) { RankAggregate first = topKAggregates.get(0); if (this.ancestors.isEmpty()) throw new UnimplementedException(first.call.getAggregation() + " only supported in a TopK pattern", - first.getNode()); + 3934, first.getNode()); RelNode parent = this.getParent(); if (!(parent instanceof LogicalFilter filter)) throw new UnimplementedException(first.call.getAggregation() + " only supported in a TopK pattern", - first.getNode()); + 3934, first.getNode()); RexNode condition = filter.getCondition(); for (RankAggregate aggregate: topKAggregates) { if (condition == null) throw new UnimplementedException(first.call.getAggregation() + " only supported in a TopK pattern", - first.getNode()); + 3934, first.getNode()); WindowCondition winCondition = this.findTopKCondition(condition, aggregate.windowFieldIndex); if (winCondition == null) throw new UnimplementedException(first.call.getAggregation() + " only supported in a TopK pattern", - first.getNode()); + 3934, first.getNode()); condition = winCondition.remaining; if (lastOperator != input) this.addOperator(lastOperator); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/RankAggregate.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/RankAggregate.java index a6eaf0636a4..54c6d8d6ff8 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/RankAggregate.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/RankAggregate.java @@ -48,7 +48,6 @@ public class RankAggregate extends WindowAggregates { int windowFieldIndex, AggregateCall call) { super(compiler, window, group, windowFieldIndex); this.call = call; - Utilities.enforce(group.getAggregateCalls(window).size() == 1); } @Override @@ -76,7 +75,7 @@ public DBSPSimpleOperator implement( // Generate comparison function for sorting the vector DBSPType inputRowType = lastOperator.getOutputZSetElementType(); DBSPComparatorExpression comparator = CalciteToDBSPCompiler.generateComparator( - node, group.orderKeys.getFieldCollations(), inputRowType, false); + node, this.group.orderKeys.getFieldCollations(), inputRowType, false); // The rank must be added at the end of the input collection (that's how Calcite expects it). DBSPVariablePath left = DBSPTypeInteger.getType(node, INT64, false).var(); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/WindowAggregates.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/WindowAggregates.java index dbfb40e90b8..0a0dadef8dc 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/WindowAggregates.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/aggregates/WindowAggregates.java @@ -2,6 +2,7 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Window; +import org.apache.calcite.sql.SqlKind; import org.dbsp.sqlCompiler.circuit.OutputPort; import org.dbsp.sqlCompiler.circuit.operator.DBSPDeindexOperator; import org.dbsp.sqlCompiler.circuit.operator.DBSPMapIndexOperator; @@ -119,7 +120,10 @@ public static WindowAggregates newGroup(CalciteToDBSPCompiler compiler, Window w int windowFieldIndex, AggregateCall call) { WindowAggregates result = switch (call.getAggregation().getKind()) { case RANK, DENSE_RANK, ROW_NUMBER -> { - if (group.getAggregateCalls(window).size() > 1) { + var allCalls = group.getAggregateCalls(window); + var rankCalls = Linq.where(Linq.map(allCalls, c -> c.getAggregation().getKind()), + k -> k == SqlKind.RANK || k == SqlKind.DENSE_RANK || k == SqlKind.ROW_NUMBER); + if (rankCalls.size() > 1) { throw new UnimplementedException("Multiple RANK aggregates per window", CalciteObject.create(window, new SourcePositionRange(call.getParserPosition()))); } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java index f1fa1d0d241..377952e2ef9 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java @@ -742,4 +742,53 @@ WHERE NOT EXISTS ( WHERE foo.out1 = a.col2 );"""); } + + @Test + public void issue5821() { + this.statementsFailingInCompilation(""" + CREATE TABLE shipments ( + shipment_id BIGINT, + warehouse_id BIGINT, + shipped_at TIMESTAMP, + expected_at TIMESTAMP, + delivered_at TIMESTAMP, + shipping_mode VARCHAR + ); + + CREATE MATERIALIZED VIEW bm07_shipping_performance AS + SELECT + warehouse_id, + MAX(DATEDIFF(delivered_at, shipped_at)) AS max_days_in_transit + FROM shipments + GROUP BY + warehouse_id, + TIMESTAMP_TRUNC(shipped_at, WEEK); + """, "Invalid number of arguments to function 'DATEDIFF'."); + } + + @Test + public void issue5822() { + this.statementsFailingInCompilation(""" + CREATE TABLE payments ( + payment_id BIGINT, + customer_id BIGINT, + payment_time TIMESTAMP, + amount DECIMAL(12, 2), + payment_method VARCHAR + ); + + CREATE VIEW bm06_customer_payment_windows AS + SELECT + customer_id, + payment_time, + amount, + ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY payment_time) AS payment_number, + LAG(amount) OVER (PARTITION BY customer_id ORDER BY payment_time) AS previous_amount, + SUM(amount) OVER ( + PARTITION BY customer_id + ORDER BY payment_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS running_amount + FROM payments""", "Not yet implemented: ROW_NUMBER only supported in a TopK pattern"); + } }