From e2ca2627c1c3fdcc09bfd20b670d287942185eb0 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Mon, 6 Nov 2017 21:22:35 +0100 Subject: [PATCH] [FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN predicates. --- .../table/runtime/join/WindowJoinUtil.scala | 8 +++- .../flink/table/api/stream/sql/JoinTest.scala | 38 ++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 863f34251d2dd..60144b2644cc4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -321,10 +321,14 @@ object WindowJoinUtil { leftLiteral.get - rightLiteral.get } val boundary = timePred.pred.getKind match { - case SqlKind.LESS_THAN => + case SqlKind.LESS_THAN if timePred.leftInputOnLeftSide => tmpTimeOffset - 1 - case SqlKind.GREATER_THAN => + case SqlKind.LESS_THAN if !timePred.leftInputOnLeftSide => tmpTimeOffset + 1 + case SqlKind.GREATER_THAN if timePred.leftInputOnLeftSide => + tmpTimeOffset + 1 + case SqlKind.GREATER_THAN if !timePred.leftInputOnLeftSide => + tmpTimeOffset - 1 case _ => tmpTimeOffset } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 53aff8265d344..95cef755dc2c6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -319,19 +319,53 @@ class JoinTest extends TableTestBase { "rowtime") verifyTimeBoundary( - "t1.c - interval '2' second >= t2.c + interval '1' second -" + - "interval '10' second and " + + "t1.c >= t2.c - interval '1' second and " + + "t1.c <= t2.c + interval '10' second", + -1000, + 10000, + "rowtime") + + verifyTimeBoundary( + "t1.c - interval '2' second >= t2.c + interval '1' second - interval '10' second and " + "t1.c <= t2.c + interval '10' second", -7000, 10000, "rowtime") + verifyTimeBoundary( + "t2.c + interval '1' second - interval '10' second <= t1.c - interval '2' second and " + + "t2.c + interval '10' second >= t1.c", + -7000, + 10000, + "rowtime") + verifyTimeBoundary( "t1.c >= t2.c - interval '10' second and " + "t1.c <= t2.c - interval '5' second", -10000, -5000, "rowtime") + + verifyTimeBoundary( + "t2.c - interval '10' second <= t1.c and " + + "t2.c - interval '5' second >= t1.c", + -10000, + -5000, + "rowtime") + + verifyTimeBoundary( + "t1.c > t2.c - interval '2' second and " + + "t1.c < t2.c + interval '2' second", + -1999, + 1999, + "rowtime") + + verifyTimeBoundary( + "t2.c > t1.c - interval '2' second and " + + "t2.c < t1.c + interval '2' second", + -1999, + 1999, + "rowtime") } @Test