diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index 703e9a08bffa..c6968b3a9ce3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.BOOLEAN; import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.INTEGER; import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.toSchema; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -36,8 +37,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.values.Row; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.junit.Test; /** UnitTest for {@link BeamSqlCli}. */ @@ -273,7 +272,6 @@ public void test_time_types() throws Exception { // test TIME field assertEquals("15:23:59.000", row.getDateTime("f_time").toString("HH:mm:ss.SSS")); // test TIMESTAMP field - assertEquals( - new DateTime(2018, 7, 1, 21, 26, 7, 123, DateTimeZone.UTC), row.getDateTime("f_ts")); + assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"), row.getDateTime("f_ts")); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 64eff9beaf38..4dc010ff24df 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -228,8 +229,8 @@ private void runAggregationFunctions(PCollection input) throws Exception { 2.5, 4.0, 1.0, - FORMAT.parseDateTime("2017-01-01 02:04:03"), - FORMAT.parseDateTime("2017-01-01 01:01:03"), + parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 1.25, 1.666666667, 1, @@ -346,12 +347,12 @@ private void runTumbleWindow(PCollection input) throws Exception { .addRows( 0, 3L, - FORMAT.parseDateTime("2017-01-01 01:00:00"), - FORMAT.parseDateTime("2017-01-01 02:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 01:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), 0, 1L, - FORMAT.parseDateTime("2017-01-01 02:00:00"), - FORMAT.parseDateTime("2017-01-01 03:00:00")) + parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 03:00:00")) .getRows(); PAssert.that(result).containsInAnyOrder(expectedRows); @@ -375,18 +376,18 @@ public void testTriggeredTumble() throws Exception { inputSchema, SerializableFunctions.identity(), SerializableFunctions.identity()) .addElements( Row.withSchema(inputSchema) - .addValues(1, FORMAT.parseDateTime("2017-01-01 01:01:01")) + .addValues(1, parseTimestampWithoutTimeZone("2017-01-01 01:01:01")) .build(), Row.withSchema(inputSchema) - .addValues(2, FORMAT.parseDateTime("2017-01-01 01:01:01")) + .addValues(2, parseTimestampWithoutTimeZone("2017-01-01 01:01:01")) .build()) .addElements( Row.withSchema(inputSchema) - .addValues(3, FORMAT.parseDateTime("2017-01-01 01:01:01")) + .addValues(3, parseTimestampWithoutTimeZone("2017-01-01 01:01:01")) .build()) .addElements( Row.withSchema(inputSchema) - .addValues(4, FORMAT.parseDateTime("2017-01-01 01:01:01")) + .addValues(4, parseTimestampWithoutTimeZone("2017-01-01 01:01:01")) .build()) .advanceWatermarkToInfinity()); @@ -452,20 +453,20 @@ private void runHopWindow(PCollection input) throws Exception { .addRows( 0, 3L, - FORMAT.parseDateTime("2017-01-01 00:30:00"), - FORMAT.parseDateTime("2017-01-01 01:30:00"), + parseTimestampWithoutTimeZone("2017-01-01 00:30:00"), + parseTimestampWithoutTimeZone("2017-01-01 01:30:00"), 0, 3L, - FORMAT.parseDateTime("2017-01-01 01:00:00"), - FORMAT.parseDateTime("2017-01-01 02:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 01:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), 0, 1L, - FORMAT.parseDateTime("2017-01-01 01:30:00"), - FORMAT.parseDateTime("2017-01-01 02:30:00"), + parseTimestampWithoutTimeZone("2017-01-01 01:30:00"), + parseTimestampWithoutTimeZone("2017-01-01 02:30:00"), 0, 1L, - FORMAT.parseDateTime("2017-01-01 02:00:00"), - FORMAT.parseDateTime("2017-01-01 03:00:00")) + parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), + parseTimestampWithoutTimeZone("2017-01-01 03:00:00")) .getRows(); PAssert.that(result).containsInAnyOrder(expectedRows); @@ -509,12 +510,12 @@ private void runSessionWindow(PCollection input) throws Exception { .addRows( 0, 3L, - FORMAT.parseDateTime("2017-01-01 01:01:03"), - FORMAT.parseDateTime("2017-01-01 01:01:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 0, 1L, - FORMAT.parseDateTime("2017-01-01 02:04:03"), - FORMAT.parseDateTime("2017-01-01 02:04:03")) + parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), + parseTimestampWithoutTimeZone("2017-01-01 02:04:03")) .getRows(); PAssert.that(result).containsInAnyOrder(expectedRows); @@ -575,7 +576,7 @@ public void testUnsupportedGlobalWindowWithDefaultTrigger() { public void testSupportsGlobalWindowWithCustomTrigger() throws Exception { pipeline.enableAbandonedNodeEnforcement(false); - DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0); + DateTime startTime = parseTimestampWithoutTimeZone("2017-1-1 0:0:0"); Schema type = Schema.builder() @@ -645,7 +646,7 @@ public void testSupportsAggregationWithoutProjection() throws Exception { @Test public void testSupportsNonGlobalWindowWithCustomTrigger() { - DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0); + DateTime startTime = parseTimestampWithoutTimeZone("2017-1-1 0:0:0"); Schema type = Schema.builder() diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 6965b50b173b..3c462952e6ad 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone; import java.math.BigDecimal; import java.text.ParseException; @@ -35,8 +36,6 @@ import org.apache.beam.sdk.values.Row; import org.joda.time.Duration; import org.joda.time.Instant; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -48,8 +47,6 @@ *

Note that, any change in these records would impact tests in this package. */ public class BeamSqlDslBase { - public static final DateTimeFormatter FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); - @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException exceptions = ExpectedException.none(); @@ -100,7 +97,7 @@ public static void prepareClass() throws ParseException { 1.0f, 1.0d, "string_row1", - FORMAT.parseDateTime("2017-01-01 01:01:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 0, new BigDecimal(1)) .addRows( @@ -111,7 +108,7 @@ public static void prepareClass() throws ParseException { 2.0f, 2.0d, "string_row2", - FORMAT.parseDateTime("2017-01-01 01:02:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:02:03"), 0, new BigDecimal(2)) .addRows( @@ -122,7 +119,7 @@ public static void prepareClass() throws ParseException { 3.0f, 3.0d, "string_row3", - FORMAT.parseDateTime("2017-01-01 01:06:03"), + parseTimestampWithoutTimeZone("2017-01-01 01:06:03"), 0, new BigDecimal(3)) .addRows( @@ -133,7 +130,7 @@ public static void prepareClass() throws ParseException { 4.0f, 4.0d, "第四行", - FORMAT.parseDateTime("2017-01-01 02:04:03"), + parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), 0, new BigDecimal(4)) .getRows(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index b533151534dc..d6433915382e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple; import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone; import static org.hamcrest.Matchers.stringContainsInOrder; import java.util.Arrays; @@ -288,7 +289,7 @@ public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception { } private PCollection ordersUnbounded() { - DateTime ts = new DateTime(2017, 1, 1, 1, 0, 0); + DateTime ts = parseTimestampWithoutTimeZone("2017-1-1 1:0:0"); return TestUtils.rowsBuilderOf( Schema.builder() diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java index 192f86da727b..ec1114cd92c4 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.extensions.sql; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseDate; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTime; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -581,7 +584,7 @@ public void testMax() { .addExpr("MAX(c_float)", 3.0f) .addExpr("MAX(c_double)", 3.0) .addExpr("MAX(c_decimal)", BigDecimal.valueOf(3.0)) - .addExpr("MAX(ts)", parseTimestamp("1986-04-15 11:35:26")); + .addExpr("MAX(ts)", parseTimestampWithUTCTimeZone("1986-04-15 11:35:26")); checker.buildRunAndCheck(getAggregationTestPCollection()); } @@ -597,7 +600,7 @@ public void testMin() { .addExpr("MIN(c_float)", 1.0f) .addExpr("MIN(c_double)", 1.0) .addExpr("MIN(c_decimal)", BigDecimal.valueOf(1.0)) - .addExpr("MIN(ts)", parseTimestamp("1986-02-15 11:35:26")); + .addExpr("MIN(ts)", parseTimestampWithUTCTimeZone("1986-02-15 11:35:26")); checker.buildRunAndCheck(getAggregationTestPCollection()); } @@ -1117,8 +1120,8 @@ public void testBasicDateTimeFunctions() { public void testFloor() { ExpressionChecker checker = new ExpressionChecker() - .addExpr("FLOOR(ts TO MONTH)", parseTimestamp("1986-02-01 00:00:00")) - .addExpr("FLOOR(ts TO YEAR)", parseTimestamp("1986-01-01 00:00:00")) + .addExpr("FLOOR(ts TO MONTH)", parseTimestampWithUTCTimeZone("1986-02-01 00:00:00")) + .addExpr("FLOOR(ts TO YEAR)", parseTimestampWithUTCTimeZone("1986-01-01 00:00:00")) .addExpr("FLOOR(c_double)", 1.0); checker.buildRunAndCheck(getFloorCeilingTestPCollection()); @@ -1130,8 +1133,8 @@ public void testFloor() { public void testCeil() { ExpressionChecker checker = new ExpressionChecker() - .addExpr("CEIL(ts TO MONTH)", parseTimestamp("1986-03-01 00:00:00")) - .addExpr("CEIL(ts TO YEAR)", parseTimestamp("1987-01-01 00:00:00")) + .addExpr("CEIL(ts TO MONTH)", parseTimestampWithUTCTimeZone("1986-03-01 00:00:00")) + .addExpr("CEIL(ts TO YEAR)", parseTimestampWithUTCTimeZone("1987-01-01 00:00:00")) .addExpr("CEIL(c_double)", 2.0); checker.buildRunAndCheck(getFloorCeilingTestPCollection()); @@ -1142,7 +1145,8 @@ public void testCeil() { public void testFloorAndCeilResolutionLimit() { thrown.expect(IllegalArgumentException.class); ExpressionChecker checker = - new ExpressionChecker().addExpr("FLOOR(ts TO DAY)", parseTimestamp("1986-02-01 00:00:00")); + new ExpressionChecker() + .addExpr("FLOOR(ts TO DAY)", parseTimestampWithUTCTimeZone("1986-02-01 00:00:00")); checker.buildRunAndCheck(); } @@ -1153,22 +1157,22 @@ public void testDatetimePlusFunction() { new ExpressionChecker() .addExpr( "TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')", - parseTimestamp("1984-04-19 01:02:06")) + parseTimestampWithUTCTimeZone("1984-04-19 01:02:06")) .addExpr( "TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')", - parseTimestamp("1984-04-19 01:05:03")) + parseTimestampWithUTCTimeZone("1984-04-19 01:05:03")) .addExpr( "TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')", - parseTimestamp("1984-04-19 04:02:03")) + parseTimestampWithUTCTimeZone("1984-04-19 04:02:03")) .addExpr( "TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')", - parseTimestamp("1984-04-22 01:02:03")) + parseTimestampWithUTCTimeZone("1984-04-22 01:02:03")) .addExpr( "TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')", - parseTimestamp("1984-03-19 01:02:03")) + parseTimestampWithUTCTimeZone("1984-03-19 01:02:03")) .addExpr( "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')", - parseTimestamp("1987-01-19 01:02:03")); + parseTimestampWithUTCTimeZone("1987-01-19 01:02:03")); checker.buildRunAndCheck(); } @@ -1179,22 +1183,22 @@ public void testDatetimeInfixPlus() { new ExpressionChecker() .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND", - parseTimestamp("1984-01-19 01:02:06")) + parseTimestampWithUTCTimeZone("1984-01-19 01:02:06")) .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE", - parseTimestamp("1984-01-19 01:04:03")) + parseTimestampWithUTCTimeZone("1984-01-19 01:04:03")) .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR", - parseTimestamp("1984-01-19 03:02:03")) + parseTimestampWithUTCTimeZone("1984-01-19 03:02:03")) .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY", - parseTimestamp("1984-01-21 01:02:03")) + parseTimestampWithUTCTimeZone("1984-01-21 01:02:03")) .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH", - parseTimestamp("1984-03-19 01:02:03")) + parseTimestampWithUTCTimeZone("1984-03-19 01:02:03")) .addExpr( "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR", - parseTimestamp("1986-01-19 01:02:03")) + parseTimestampWithUTCTimeZone("1986-01-19 01:02:03")) .addExpr("DATE '1984-04-19' + INTERVAL '2' DAY", parseDate("1984-04-21")) .addExpr("DATE '1984-04-19' + INTERVAL '1' MONTH", parseDate("1984-05-19")) .addExpr("DATE '1984-04-19' + INTERVAL '3' YEAR", parseDate("1987-04-19")) @@ -1300,22 +1304,22 @@ public void testTimestampMinusInterval() { new ExpressionChecker() .addExpr( "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND", - parseTimestamp("1984-04-19 01:01:56")) + parseTimestampWithUTCTimeZone("1984-04-19 01:01:56")) .addExpr( "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE", - parseTimestamp("1984-04-19 01:00:58")) + parseTimestampWithUTCTimeZone("1984-04-19 01:00:58")) .addExpr( "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR", - parseTimestamp("1984-04-18 21:01:58")) + parseTimestampWithUTCTimeZone("1984-04-18 21:01:58")) .addExpr( "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY", - parseTimestamp("1984-04-14 01:01:58")) + parseTimestampWithUTCTimeZone("1984-04-14 01:01:58")) .addExpr( "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH", - parseTimestamp("1983-11-19 01:01:58")) + parseTimestampWithUTCTimeZone("1983-11-19 01:01:58")) .addExpr( "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR", - parseTimestamp("1983-01-19 01:01:58")) + parseTimestampWithUTCTimeZone("1983-01-19 01:01:58")) .addExpr("DATE '1984-04-19' - INTERVAL '2' DAY", parseDate("1984-04-17")) .addExpr("DATE '1984-04-19' - INTERVAL '1' MONTH", parseDate("1984-03-19")) .addExpr("DATE '1984-04-19' - INTERVAL '3' YEAR", parseDate("1981-04-19")) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index a4c84ca8dcdf..e78c1136a31e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone; import static org.hamcrest.Matchers.containsString; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; @@ -71,7 +72,9 @@ public void testJodaTimeUdfUdaf() throws Exception { Schema resultType = Schema.builder().addDateTimeField("jodatime").build(); Row row1 = - Row.withSchema(resultType).addValues(FORMAT.parseDateTime("2017-01-01 02:04:03")).build(); + Row.withSchema(resultType) + .addValues(parseTimestampWithoutTimeZone("2017-01-01 02:04:03")) + .build(); String sql1 = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION"; PCollection result1 = @@ -80,7 +83,9 @@ public void testJodaTimeUdfUdaf() throws Exception { PAssert.that(result1).containsInAnyOrder(row1); Row row2 = - Row.withSchema(resultType).addValues(FORMAT.parseDateTime("2016-12-31 01:01:03")).build(); + Row.withSchema(resultType) + .addValues(parseTimestampWithoutTimeZone("2016-12-31 01:01:03")) + .build(); String sql2 = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1"; PCollection result2 = diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index 48cdb310e7c7..a21d350bb342 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.integrationtest; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.extensions.sql.utils.RowAsserts.matchesScalar; import static org.junit.Assert.assertTrue; @@ -53,7 +54,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; import org.junit.Rule; /** Base class for all built-in functions integration tests. */ @@ -118,7 +118,7 @@ protected PCollection getTestPCollection() { try { return TestBoundedTable.of(ROW_TYPE) .addRows( - parseTimestamp("1986-02-15 11:35:26"), + parseTimestampWithUTCTimeZone("1986-02-15 11:35:26"), (byte) 1, (short) 1, 1, @@ -140,7 +140,7 @@ protected PCollection getTestPCollection() { protected PCollection getFloorCeilingTestPCollection() { try { return TestBoundedTable.of(ROW_TYPE_THREE) - .addRows(parseTimestamp("1986-02-15 11:35:26"), 1.4) + .addRows(parseTimestampWithUTCTimeZone("1986-02-15 11:35:26"), 1.4) .buildIOReader(pipeline.begin()) .setRowSchema(ROW_TYPE_THREE); } catch (Exception e) { @@ -152,7 +152,7 @@ protected PCollection getAggregationTestPCollection() { try { return TestBoundedTable.of(ROW_TYPE_TWO) .addRows( - parseTimestamp("1986-02-15 11:35:26"), + parseTimestampWithUTCTimeZone("1986-02-15 11:35:26"), (byte) 1, (short) 1, 1, @@ -163,7 +163,7 @@ protected PCollection getAggregationTestPCollection() { 7.0, BigDecimal.valueOf(1.0)) .addRows( - parseTimestamp("1986-03-15 11:35:26"), + parseTimestampWithUTCTimeZone("1986-03-15 11:35:26"), (byte) 2, (short) 2, 2, @@ -174,7 +174,7 @@ protected PCollection getAggregationTestPCollection() { 8.0, BigDecimal.valueOf(2.0)) .addRows( - parseTimestamp("1986-04-15 11:35:26"), + parseTimestampWithUTCTimeZone("1986-04-15 11:35:26"), (byte) 3, (short) 3, 3, @@ -192,23 +192,6 @@ protected PCollection getAggregationTestPCollection() { } } - protected static DateTime parseTimestamp(String str) { - return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC().parseDateTime(str); - } - - protected static DateTime parseDate(String str) { - return DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC().parseDateTime(str); - } - - protected static DateTime parseTime(String str) { - // DateTimeFormat does not parse "08:10:10" for pattern "HH:mm:ss.SSS". In this case, '.' must appear. - if (str.indexOf('.') == -1) { - return DateTimeFormat.forPattern("HH:mm:ss").withZoneUTC().parseDateTime(str); - } else { - return DateTimeFormat.forPattern("HH:mm:ss.SSS").withZoneUTC().parseDateTime(str); - } - } - @AutoValue abstract static class ExpressionTestCase { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java index 98225693e7e1..3b4e5ba86c18 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; @@ -50,9 +51,7 @@ import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; -import org.joda.time.DateTime; import org.joda.time.Duration; -import org.joda.time.chrono.ISOChronology; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -146,7 +145,7 @@ public void testSQLRead() { (float) 1.0, 1.0, true, - new DateTime(2018, 05, 28, 20, 17, 40, 123, ISOChronology.getInstanceUTC()), + parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))); @@ -208,7 +207,7 @@ public void testSQLTypes() { (float) 1.0, 1.0, true, - new DateTime(2018, 05, 28, 20, 17, 40, 123, ISOChronology.getInstanceUTC()), + parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456")))); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java index b9d80b33a141..eca3a9ba9131 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java @@ -181,8 +181,8 @@ private TimestampedValue message( new PubsubMessage(payload.getBytes(UTF_8), attributes), ts(timestamp)); } - private Instant ts(long timestamp) { - return new DateTime(timestamp).toInstant(); + private Instant ts(long epochMills) { + return new DateTime(epochMills).toInstant(); } private static Set convertToSet( diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/DateTimeUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/DateTimeUtils.java new file mode 100644 index 000000000000..a88edd470f66 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/DateTimeUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.utils; + +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; + +/** DateTimeUtils. */ +public class DateTimeUtils { + + public static DateTime parseTimestampWithUTCTimeZone(String str) { + if (str.indexOf('.') == -1) { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC().parseDateTime(str); + } else { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC().parseDateTime(str); + } + } + + public static DateTime parseTimestampWithTimeZone(String str) { + // for example, accept "1990-10-20 13:24:01+0730" + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ").parseDateTime(str); + } + + public static DateTime parseTimestampWithoutTimeZone(String str) { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").parseDateTime(str); + } + + public static DateTime parseDate(String str) { + return DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC().parseDateTime(str); + } + + public static DateTime parseTime(String str) { + // DateTimeFormat does not parse "08:10:10" for pattern "HH:mm:ss.SSS". In this case, '.' must + // appear. + if (str.indexOf('.') == -1) { + return DateTimeFormat.forPattern("HH:mm:ss").withZoneUTC().parseDateTime(str); + } else { + return DateTimeFormat.forPattern("HH:mm:ss.SSS").withZoneUTC().parseDateTime(str); + } + } +}