From ab04f80f74caceae60af969fa68d5b34056e39fe Mon Sep 17 00:00:00 2001 From: Andrew Pilloud Date: Wed, 29 Apr 2020 17:19:42 -0700 Subject: [PATCH] [BEAM-9661] Fix ORDER BY with LIMIT --- .../LimitOffsetScanToOrderByLimitConverter.java | 11 ++++++----- .../sql/zetasql/ZetaSQLDialectSpecTest.java | 13 +++++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java index 150688267bf6f..ab12377155585 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java @@ -81,18 +81,19 @@ public RelNode convert(ResolvedLimitOffsetScan zetaNode, List inputs) { /** Collation is a sort order, as in ORDER BY DESCENDING/ASCENDING. */ private static RelCollation getRelCollation(ResolvedOrderByScan node) { + final long inputOffset = node.getColumnList().get(0).getId(); List fieldCollations = node.getOrderByItemList().stream() - .map(LimitOffsetScanToOrderByLimitConverter::orderByItemToFieldCollation) + .map(item -> orderByItemToFieldCollation(item, inputOffset)) .collect(toList()); return RelCollationImpl.of(fieldCollations); } - private static RelFieldCollation orderByItemToFieldCollation(ResolvedOrderByItem item) { - // TODO: might need a column ref mapping here. + private static RelFieldCollation orderByItemToFieldCollation( + ResolvedOrderByItem item, long inputOffset) { Direction sortDirection = item.getIsDescending() ? DESCENDING : ASCENDING; - int fieldIndex = (int) item.getColumnRef().getColumn().getId(); - return new RelFieldCollation(fieldIndex, sortDirection); + final long fieldIndex = item.getColumnRef().getColumn().getId() - inputOffset; + return new RelFieldCollation((int) fieldIndex, sortDirection); } private RelNode convertOrderByScanToLogicalScan(ResolvedOrderByScan node, RelNode input) { diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java index 960a89e657459..1690a0d158c24 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java @@ -1325,6 +1325,19 @@ public void testZetaSQLSelectNullOffsetParam() { BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); } + @Test + public void testZetaSQLSelectFromTableOrderLimit() { + String sql = + "SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 " + + "UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1"; + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); + PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L, 0L).build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testZetaSQLSelectFromTableLimitOffset() { String sql =