Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.BOOLEAN;
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.INTEGER;
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
Expand All @@ -36,8 +37,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;

/** UnitTest for {@link BeamSqlCli}. */
Expand Down Expand Up @@ -273,7 +272,6 @@ public void test_time_types() throws Exception {
// test TIME field
assertEquals("15:23:59.000", row.getDateTime("f_time").toString("HH:mm:ss.SSS"));
// test TIMESTAMP field
assertEquals(
new DateTime(2018, 7, 1, 21, 26, 7, 123, DateTimeZone.UTC), row.getDateTime("f_ts"));
assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"), row.getDateTime("f_ts"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;

import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -228,8 +229,8 @@ private void runAggregationFunctions(PCollection<Row> input) throws Exception {
2.5,
4.0,
1.0,
FORMAT.parseDateTime("2017-01-01 02:04:03"),
FORMAT.parseDateTime("2017-01-01 01:01:03"),
parseTimestampWithoutTimeZone("2017-01-01 02:04:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:01:03"),
1.25,
1.666666667,
1,
Expand Down Expand Up @@ -346,12 +347,12 @@ private void runTumbleWindow(PCollection<Row> input) throws Exception {
.addRows(
0,
3L,
FORMAT.parseDateTime("2017-01-01 01:00:00"),
FORMAT.parseDateTime("2017-01-01 02:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 01:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 02:00:00"),
0,
1L,
FORMAT.parseDateTime("2017-01-01 02:00:00"),
FORMAT.parseDateTime("2017-01-01 03:00:00"))
parseTimestampWithoutTimeZone("2017-01-01 02:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 03:00:00"))
.getRows();

PAssert.that(result).containsInAnyOrder(expectedRows);
Expand All @@ -375,18 +376,18 @@ public void testTriggeredTumble() throws Exception {
inputSchema, SerializableFunctions.identity(), SerializableFunctions.identity())
.addElements(
Row.withSchema(inputSchema)
.addValues(1, FORMAT.parseDateTime("2017-01-01 01:01:01"))
.addValues(1, parseTimestampWithoutTimeZone("2017-01-01 01:01:01"))
.build(),
Row.withSchema(inputSchema)
.addValues(2, FORMAT.parseDateTime("2017-01-01 01:01:01"))
.addValues(2, parseTimestampWithoutTimeZone("2017-01-01 01:01:01"))
.build())
.addElements(
Row.withSchema(inputSchema)
.addValues(3, FORMAT.parseDateTime("2017-01-01 01:01:01"))
.addValues(3, parseTimestampWithoutTimeZone("2017-01-01 01:01:01"))
.build())
.addElements(
Row.withSchema(inputSchema)
.addValues(4, FORMAT.parseDateTime("2017-01-01 01:01:01"))
.addValues(4, parseTimestampWithoutTimeZone("2017-01-01 01:01:01"))
.build())
.advanceWatermarkToInfinity());

Expand Down Expand Up @@ -452,20 +453,20 @@ private void runHopWindow(PCollection<Row> input) throws Exception {
.addRows(
0,
3L,
FORMAT.parseDateTime("2017-01-01 00:30:00"),
FORMAT.parseDateTime("2017-01-01 01:30:00"),
parseTimestampWithoutTimeZone("2017-01-01 00:30:00"),
parseTimestampWithoutTimeZone("2017-01-01 01:30:00"),
0,
3L,
FORMAT.parseDateTime("2017-01-01 01:00:00"),
FORMAT.parseDateTime("2017-01-01 02:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 01:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 02:00:00"),
0,
1L,
FORMAT.parseDateTime("2017-01-01 01:30:00"),
FORMAT.parseDateTime("2017-01-01 02:30:00"),
parseTimestampWithoutTimeZone("2017-01-01 01:30:00"),
parseTimestampWithoutTimeZone("2017-01-01 02:30:00"),
0,
1L,
FORMAT.parseDateTime("2017-01-01 02:00:00"),
FORMAT.parseDateTime("2017-01-01 03:00:00"))
parseTimestampWithoutTimeZone("2017-01-01 02:00:00"),
parseTimestampWithoutTimeZone("2017-01-01 03:00:00"))
.getRows();

PAssert.that(result).containsInAnyOrder(expectedRows);
Expand Down Expand Up @@ -509,12 +510,12 @@ private void runSessionWindow(PCollection<Row> input) throws Exception {
.addRows(
0,
3L,
FORMAT.parseDateTime("2017-01-01 01:01:03"),
FORMAT.parseDateTime("2017-01-01 01:01:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:01:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:01:03"),
0,
1L,
FORMAT.parseDateTime("2017-01-01 02:04:03"),
FORMAT.parseDateTime("2017-01-01 02:04:03"))
parseTimestampWithoutTimeZone("2017-01-01 02:04:03"),
parseTimestampWithoutTimeZone("2017-01-01 02:04:03"))
.getRows();

PAssert.that(result).containsInAnyOrder(expectedRows);
Expand Down Expand Up @@ -575,7 +576,7 @@ public void testUnsupportedGlobalWindowWithDefaultTrigger() {
public void testSupportsGlobalWindowWithCustomTrigger() throws Exception {
pipeline.enableAbandonedNodeEnforcement(false);

DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0);
DateTime startTime = parseTimestampWithoutTimeZone("2017-1-1 0:0:0");

Schema type =
Schema.builder()
Expand Down Expand Up @@ -645,7 +646,7 @@ public void testSupportsAggregationWithoutProjection() throws Exception {

@Test
public void testSupportsNonGlobalWindowWithCustomTrigger() {
DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0);
DateTime startTime = parseTimestampWithoutTimeZone("2017-1-1 0:0:0");

Schema type =
Schema.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;

import java.math.BigDecimal;
import java.text.ParseException;
Expand All @@ -35,8 +36,6 @@
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand All @@ -48,8 +47,6 @@
* <p>Note that, any change in these records would impact tests in this package.
*/
public class BeamSqlDslBase {
public static final DateTimeFormatter FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException exceptions = ExpectedException.none();

Expand Down Expand Up @@ -100,7 +97,7 @@ public static void prepareClass() throws ParseException {
1.0f,
1.0d,
"string_row1",
FORMAT.parseDateTime("2017-01-01 01:01:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:01:03"),
0,
new BigDecimal(1))
.addRows(
Expand All @@ -111,7 +108,7 @@ public static void prepareClass() throws ParseException {
2.0f,
2.0d,
"string_row2",
FORMAT.parseDateTime("2017-01-01 01:02:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:02:03"),
0,
new BigDecimal(2))
.addRows(
Expand All @@ -122,7 +119,7 @@ public static void prepareClass() throws ParseException {
3.0f,
3.0d,
"string_row3",
FORMAT.parseDateTime("2017-01-01 01:06:03"),
parseTimestampWithoutTimeZone("2017-01-01 01:06:03"),
0,
new BigDecimal(3))
.addRows(
Expand All @@ -133,7 +130,7 @@ public static void prepareClass() throws ParseException {
4.0f,
4.0d,
"第四行",
FORMAT.parseDateTime("2017-01-01 02:04:03"),
parseTimestampWithoutTimeZone("2017-01-01 02:04:03"),
0,
new BigDecimal(4))
.getRows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
import static org.hamcrest.Matchers.stringContainsInOrder;

import java.util.Arrays;
Expand Down Expand Up @@ -288,7 +289,7 @@ public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
}

private PCollection<Row> ordersUnbounded() {
DateTime ts = new DateTime(2017, 1, 1, 1, 0, 0);
DateTime ts = parseTimestampWithoutTimeZone("2017-1-1 1:0:0");

return TestUtils.rowsBuilderOf(
Schema.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.extensions.sql;

import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseDate;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTime;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -581,7 +584,7 @@ public void testMax() {
.addExpr("MAX(c_float)", 3.0f)
.addExpr("MAX(c_double)", 3.0)
.addExpr("MAX(c_decimal)", BigDecimal.valueOf(3.0))
.addExpr("MAX(ts)", parseTimestamp("1986-04-15 11:35:26"));
.addExpr("MAX(ts)", parseTimestampWithUTCTimeZone("1986-04-15 11:35:26"));
checker.buildRunAndCheck(getAggregationTestPCollection());
}

Expand All @@ -597,7 +600,7 @@ public void testMin() {
.addExpr("MIN(c_float)", 1.0f)
.addExpr("MIN(c_double)", 1.0)
.addExpr("MIN(c_decimal)", BigDecimal.valueOf(1.0))
.addExpr("MIN(ts)", parseTimestamp("1986-02-15 11:35:26"));
.addExpr("MIN(ts)", parseTimestampWithUTCTimeZone("1986-02-15 11:35:26"));
checker.buildRunAndCheck(getAggregationTestPCollection());
}

Expand Down Expand Up @@ -1117,8 +1120,8 @@ public void testBasicDateTimeFunctions() {
public void testFloor() {
ExpressionChecker checker =
new ExpressionChecker()
.addExpr("FLOOR(ts TO MONTH)", parseTimestamp("1986-02-01 00:00:00"))
.addExpr("FLOOR(ts TO YEAR)", parseTimestamp("1986-01-01 00:00:00"))
.addExpr("FLOOR(ts TO MONTH)", parseTimestampWithUTCTimeZone("1986-02-01 00:00:00"))
.addExpr("FLOOR(ts TO YEAR)", parseTimestampWithUTCTimeZone("1986-01-01 00:00:00"))
.addExpr("FLOOR(c_double)", 1.0);

checker.buildRunAndCheck(getFloorCeilingTestPCollection());
Expand All @@ -1130,8 +1133,8 @@ public void testFloor() {
public void testCeil() {
ExpressionChecker checker =
new ExpressionChecker()
.addExpr("CEIL(ts TO MONTH)", parseTimestamp("1986-03-01 00:00:00"))
.addExpr("CEIL(ts TO YEAR)", parseTimestamp("1987-01-01 00:00:00"))
.addExpr("CEIL(ts TO MONTH)", parseTimestampWithUTCTimeZone("1986-03-01 00:00:00"))
.addExpr("CEIL(ts TO YEAR)", parseTimestampWithUTCTimeZone("1987-01-01 00:00:00"))
.addExpr("CEIL(c_double)", 2.0);

checker.buildRunAndCheck(getFloorCeilingTestPCollection());
Expand All @@ -1142,7 +1145,8 @@ public void testCeil() {
public void testFloorAndCeilResolutionLimit() {
thrown.expect(IllegalArgumentException.class);
ExpressionChecker checker =
new ExpressionChecker().addExpr("FLOOR(ts TO DAY)", parseTimestamp("1986-02-01 00:00:00"));
new ExpressionChecker()
.addExpr("FLOOR(ts TO DAY)", parseTimestampWithUTCTimeZone("1986-02-01 00:00:00"));
checker.buildRunAndCheck();
}

Expand All @@ -1153,22 +1157,22 @@ public void testDatetimePlusFunction() {
new ExpressionChecker()
.addExpr(
"TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')",
parseTimestamp("1984-04-19 01:02:06"))
parseTimestampWithUTCTimeZone("1984-04-19 01:02:06"))
.addExpr(
"TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')",
parseTimestamp("1984-04-19 01:05:03"))
parseTimestampWithUTCTimeZone("1984-04-19 01:05:03"))
.addExpr(
"TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')",
parseTimestamp("1984-04-19 04:02:03"))
parseTimestampWithUTCTimeZone("1984-04-19 04:02:03"))
.addExpr(
"TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')",
parseTimestamp("1984-04-22 01:02:03"))
parseTimestampWithUTCTimeZone("1984-04-22 01:02:03"))
.addExpr(
"TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')",
parseTimestamp("1984-03-19 01:02:03"))
parseTimestampWithUTCTimeZone("1984-03-19 01:02:03"))
.addExpr(
"TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')",
parseTimestamp("1987-01-19 01:02:03"));
parseTimestampWithUTCTimeZone("1987-01-19 01:02:03"));
checker.buildRunAndCheck();
}

Expand All @@ -1179,22 +1183,22 @@ public void testDatetimeInfixPlus() {
new ExpressionChecker()
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND",
parseTimestamp("1984-01-19 01:02:06"))
parseTimestampWithUTCTimeZone("1984-01-19 01:02:06"))
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE",
parseTimestamp("1984-01-19 01:04:03"))
parseTimestampWithUTCTimeZone("1984-01-19 01:04:03"))
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR",
parseTimestamp("1984-01-19 03:02:03"))
parseTimestampWithUTCTimeZone("1984-01-19 03:02:03"))
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY",
parseTimestamp("1984-01-21 01:02:03"))
parseTimestampWithUTCTimeZone("1984-01-21 01:02:03"))
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH",
parseTimestamp("1984-03-19 01:02:03"))
parseTimestampWithUTCTimeZone("1984-03-19 01:02:03"))
.addExpr(
"TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
parseTimestamp("1986-01-19 01:02:03"))
parseTimestampWithUTCTimeZone("1986-01-19 01:02:03"))
.addExpr("DATE '1984-04-19' + INTERVAL '2' DAY", parseDate("1984-04-21"))
.addExpr("DATE '1984-04-19' + INTERVAL '1' MONTH", parseDate("1984-05-19"))
.addExpr("DATE '1984-04-19' + INTERVAL '3' YEAR", parseDate("1987-04-19"))
Expand Down Expand Up @@ -1300,22 +1304,22 @@ public void testTimestampMinusInterval() {
new ExpressionChecker()
.addExpr(
"TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND",
parseTimestamp("1984-04-19 01:01:56"))
parseTimestampWithUTCTimeZone("1984-04-19 01:01:56"))
.addExpr(
"TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE",
parseTimestamp("1984-04-19 01:00:58"))
parseTimestampWithUTCTimeZone("1984-04-19 01:00:58"))
.addExpr(
"TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR",
parseTimestamp("1984-04-18 21:01:58"))
parseTimestampWithUTCTimeZone("1984-04-18 21:01:58"))
.addExpr(
"TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY",
parseTimestamp("1984-04-14 01:01:58"))
parseTimestampWithUTCTimeZone("1984-04-14 01:01:58"))
.addExpr(
"TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH",
parseTimestamp("1983-11-19 01:01:58"))
parseTimestampWithUTCTimeZone("1983-11-19 01:01:58"))
.addExpr(
"TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
parseTimestamp("1983-01-19 01:01:58"))
parseTimestampWithUTCTimeZone("1983-01-19 01:01:58"))
.addExpr("DATE '1984-04-19' - INTERVAL '2' DAY", parseDate("1984-04-17"))
.addExpr("DATE '1984-04-19' - INTERVAL '1' MONTH", parseDate("1984-03-19"))
.addExpr("DATE '1984-04-19' - INTERVAL '3' YEAR", parseDate("1981-04-19"))
Expand Down
Loading