Skip to content

Commit

Permalink
[BEAM-9661] Fix ORDER BY with LIMIT
Browse files Browse the repository at this point in the history
  • Loading branch information
apilloud committed May 4, 2020
1 parent 4d38e39 commit f8134d7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Expand Up @@ -81,18 +81,18 @@ public RelNode convert(ResolvedLimitOffsetScan zetaNode, List<RelNode> inputs) {

/** Collation is a sort order, as in ORDER BY DESCENDING/ASCENDING. */
private static RelCollation getRelCollation(ResolvedOrderByScan node) {
final long inputOffset = node.getColumnList().get(0).getId();
List<RelFieldCollation> fieldCollations =
node.getOrderByItemList().stream()
.map(LimitOffsetScanToOrderByLimitConverter::orderByItemToFieldCollation)
.map(item -> orderByItemToFieldCollation(item, inputOffset))
.collect(toList());
return RelCollationImpl.of(fieldCollations);
}

private static RelFieldCollation orderByItemToFieldCollation(ResolvedOrderByItem item) {
// TODO: might need a column ref mapping here.
private static RelFieldCollation orderByItemToFieldCollation(ResolvedOrderByItem item, long inputOffset) {
Direction sortDirection = item.getIsDescending() ? DESCENDING : ASCENDING;
int fieldIndex = (int) item.getColumnRef().getColumn().getId();
return new RelFieldCollation(fieldIndex, sortDirection);
final long fieldIndex = item.getColumnRef().getColumn().getId() - inputOffset;
return new RelFieldCollation((int) fieldIndex, sortDirection);
}

private RelNode convertOrderByScanToLogicalScan(ResolvedOrderByScan node, RelNode input) {
Expand Down
Expand Up @@ -1325,6 +1325,19 @@ public void testZetaSQLSelectNullOffsetParam() {
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
}

@Test
public void testZetaSQLSelectFromTableOrderLimit() {
String sql =
"SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 " +
"UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L, 0L).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testZetaSQLSelectFromTableLimitOffset() {
String sql =
Expand Down

0 comments on commit f8134d7

Please sign in to comment.