From 68aaf7107e47edc18b9c02c9e17b97358e03af90 Mon Sep 17 00:00:00 2001 From: Zang Date: Thu, 7 Sep 2017 13:51:46 -0700 Subject: [PATCH] support ORDER BY [TIMESTAMP] with test case -- testOrderBy_timestamp -- in sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java --- .../extensions/sql/impl/rel/BeamSortRel.java | 1 + .../sql/impl/rel/BeamSortRelTest.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 6a260bb06458..99626aa9e600 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -213,6 +213,7 @@ public BeamSqlRowComparator(List fieldsIndices, case DOUBLE: case VARCHAR: case DATE: + case TIMESTAMP: Comparable v1 = (Comparable) row1.getFieldValue(fieldIndex); Comparable v2 = (Comparable) row2.getFieldValue(fieldIndex); fieldRet = v1.compareTo(v2); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index 19ba0d0a4f9a..f8dbc4cf86b4 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -91,6 +91,46 @@ public void testOrderBy_basic() throws Exception { pipeline.run().waitUntilFinish(); } + @Test + public void testOrderBy_timestamp() throws Exception { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.TIMESTAMP, "order_time" + ).addRows( + 4L, 4, new Date(0), + 3L, 3, new Date(1), + 2L, 2, new Date(2), + 1L, 1, new Date(3) + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id" + ) + ); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " + + " order_id, site_id " + + "FROM ORDER_DETAILS " + + "ORDER BY order_time desc limit 3"; + + PCollection rows = compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id" + ).addRows( + 1L, 1, + 2L, 2, + 3L, 3 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + @Test public void testOrderBy_nullsFirst() throws Exception { sqlEnv.registerTable("ORDER_DETAILS",