From c236ef3c425f0260f104938349c2cd86655e1186 Mon Sep 17 00:00:00 2001 From: Zang Date: Mon, 11 Sep 2017 14:59:52 -0700 Subject: [PATCH 1/2] [BEAM-2804] support TIMESTAMP in sort --- .../extensions/sql/impl/rel/BeamSortRel.java | 1 + .../sql/impl/rel/BeamSortRelTest.java | 41 ++++++++++++++----- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 6a260bb06458..99626aa9e600 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -213,6 +213,7 @@ public BeamSqlRowComparator(List fieldsIndices, case DOUBLE: case VARCHAR: case DATE: + case TIMESTAMP: Comparable v1 = (Comparable) row1.getFieldValue(fieldIndex); Comparable v2 = (Comparable) row2.getFieldValue(fieldIndex); fieldRet = v1.compareTo(v2); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index 19ba0d0a4f9a..8605963e75ef 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -49,16 +49,16 @@ public void prepare() { Types.DOUBLE, "price", Types.TIMESTAMP, "order_time" ).addRows( - 1L, 2, 1.0, new Date(), - 1L, 1, 2.0, new Date(), - 2L, 4, 3.0, new Date(), - 2L, 1, 4.0, new Date(), - 5L, 5, 5.0, new Date(), - 6L, 6, 6.0, new Date(), - 7L, 7, 7.0, new Date(), - 8L, 8888, 8.0, new Date(), - 8L, 999, 9.0, new Date(), - 10L, 100, 10.0, new Date() + 1L, 2, 1.0, new Date(0), + 1L, 1, 2.0, new Date(1), + 2L, 4, 3.0, new Date(2), + 2L, 1, 4.0, new Date(3), + 5L, 5, 5.0, new Date(4), + 6L, 6, 6.0, new Date(5), + 7L, 7, 7.0, new Date(6), + 8L, 8888, 8.0, new Date(7), + 8L, 999, 9.0, new Date(8), + 10L, 100, 10.0, new Date(9) ) ); sqlEnv.registerTable("SUB_ORDER_RAM", @@ -91,6 +91,27 @@ public void testOrderBy_basic() throws Exception { pipeline.run().waitUntilFinish(); } + @Test + public void testOrderBy_timestamp() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_time desc limit 4"; + + PCollection rows = compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 7L, 7, 7.0, + 8L, 8888, 8.0, + 8L, 999, 9.0, + 10L, 100, 10.0 + ).getRows()); + pipeline.run().waitUntilFinish(); + } + @Test public void testOrderBy_nullsFirst() throws Exception { sqlEnv.registerTable("ORDER_DETAILS", From f0861d1ddbbdc28a1939724dceaf3e36f762d3ab Mon Sep 17 00:00:00 2001 From: Zang Date: Mon, 11 Sep 2017 16:59:07 -0700 Subject: [PATCH 2/2] reuse prepared input for testOrderBy_timestamp() test case --- .../extensions/sql/impl/rel/BeamSortRelTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index 8605963e75ef..bab52967ef9f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -93,8 +93,7 @@ public void testOrderBy_basic() throws Exception { @Test public void testOrderBy_timestamp() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " + String sql = "SELECT order_id, site_id, price, order_time " + "FROM ORDER_DETAILS " + "ORDER BY order_time desc limit 4"; @@ -102,12 +101,13 @@ public void testOrderBy_timestamp() throws Exception { PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", Types.INTEGER, "site_id", - Types.DOUBLE, "price" + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" ).addRows( - 7L, 7, 7.0, - 8L, 8888, 8.0, - 8L, 999, 9.0, - 10L, 100, 10.0 + 7L, 7, 7.0, new Date(6), + 8L, 8888, 8.0, new Date(7), + 8L, 999, 9.0, new Date(8), + 10L, 100, 10.0, new Date(9) ).getRows()); pipeline.run().waitUntilFinish(); }