From c17ce671755bd7ec7b27f0cfac88a641ac1cd92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Mon, 23 Aug 2021 16:12:43 -0500 Subject: [PATCH] chore: revert GRACE PERIOD from joins (#8028) --- .../ksql/engine/rewrite/QueryAnonymizer.java | 15 +--- .../engine/rewrite/QueryAnonymizerTest.java | 18 ----- .../ksql/test/QueriesToAnonymizeTest.txt | 2 - ...queriesAreAnonymizedCorrectly.approved.txt | 2 - .../query-validation-tests/joins.json | 71 ------------------- .../io/confluent/ksql/parser/SqlBase.g4 | 4 +- .../io/confluent/ksql/parser/AstBuilder.java | 9 ++- .../confluent/ksql/parser/KsqlParserTest.java | 62 ---------------- 8 files changed, 8 insertions(+), 175 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java index 5755896c87dc..bb764bbd1b26 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/QueryAnonymizer.java @@ -879,11 +879,6 @@ private static String visitWithinExpression(final WithinExpressionContext contex stringBuilder.append(String.format("%s", anonymizeJoinWindowSize(singleWithin.joinWindowSize()))); - - if (singleWithin.gracePeriodClause() != null) { - stringBuilder.append(String.format(" GRACE PERIOD %s", - anonymizeGracePeriod(singleWithin.gracePeriodClause()))); - } } else if (context instanceof JoinWindowWithBeforeAndAfterContext) { final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow = (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) context; @@ -891,16 +886,10 @@ private static String visitWithinExpression(final WithinExpressionContext contex stringBuilder.append(String.format("(%s, %s)", anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)), anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1)))); - - if (beforeAndAfterJoinWindow.gracePeriodClause() != null) { - stringBuilder.append(String.format(" GRACE PERIOD %s", - anonymizeGracePeriod(beforeAndAfterJoinWindow.gracePeriodClause()))); - } } else { throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 " - + "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with " - + "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or " - + "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds"); + + "seconds\", or a join window with " + "before and after specified, ie. " + + "\"WITHIN (10 seconds, 20 seconds)\""); } return stringBuilder.append(' ').toString(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java index 3df49a77521e..eb9529d5582f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/QueryAnonymizerTest.java @@ -133,24 +133,6 @@ public void shouldAnonymizeJoinStatementsCorrectly() { Approvals.verify(output); } - @Test - public void shouldAnonymizeJoinWithGraceStatementsCorrectly() { - final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3" - + " FROM SOURCE1 S1 JOIN SOURCE2 S2 " - + "WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON col1.k=col2.k;"); - - Approvals.verify(output); - } - - @Test - public void shouldAnonymizeJoinWithBeforeAndAfterAndGraceStatementsCorrectly() { - final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3" - + " FROM SOURCE1 S1 JOIN SOURCE2 S2 " - + "WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON col1.k=col2.k;"); - - Approvals.verify(output); - } - @Test public void shouldAnonymizeCreateStreamQueryCorrectly() { final String output = anon.anonymize( diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt index 8885fe4545fb..95f46d1da0f6 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueriesToAnonymizeTest.txt @@ -56,6 +56,4 @@ INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE; INSERT INTO SINK SELECT * FROM SOURCE; INSERT INTO OUTPUT SELECT * FROM SOURCE2; INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K; -INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON S1.K = S2.K; -INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON S1.K = S2.K; DROP STREAM IF EXISTS input2; diff --git a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt index ff7a2fad35c8..3d0a1ba23b02 100644 --- a/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt +++ b/ksqldb-functional-tests/src/test/java/io/confluent/ksql/test/QueryAnonymizerTest.queriesAreAnonymizedCorrectly.approved.txt @@ -56,6 +56,4 @@ INSERT INTO stream43 SELECT column124, column126 FROM source4; INSERT INTO stream44 SELECT * FROM source4; INSERT INTO stream41 SELECT * FROM source3; INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2; -INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2; -INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN ('0' SECOND, '0' SECONDS) GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2; DROP STREAM IF EXISTS stream6; \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json index cac5f0ad23cc..3d76dfbe9c6b 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -855,77 +855,6 @@ ] } }, - { - "name": "stream stream inner join with out of order and custom grace period", - "format": ["AVRO", "JSON", "PROTOBUF"], - "statements": [ - "CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", - "CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", - "CREATE STREAM INNER_JOIN as SELECT t.id, l1, l2 FROM LEFT_STREAM t join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;" - ], - "inputs": [ - {"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0}, - {"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000}, - {"topic": "left_topic", "key": 1, "value": {"L1": "B"}, "timestamp": 330000}, - {"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000}, - {"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000}, - {"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000}, - {"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000} - ], - "outputs": [ - {"topic": "INNER_JOIN", "key": 0, "value": {"L1": "A", "L2": "a"}, "timestamp": 60000}, - {"topic": "INNER_JOIN", "key": 2, "value": {"L1": "C", "L2": "c"}, "timestamp": 90000} - ] - }, - { - "name": "stream stream left join with out of order and custom grace period", - "format": ["AVRO", "JSON"], - "statements": [ - "CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", - "CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", - "CREATE STREAM LEFT_JOIN as SELECT t.id, l1, l2 FROM LEFT_STREAM t left join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;" - ], - "inputs": [ - {"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0}, - {"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000}, - {"topic": "left_topic", "key": 1, "value": {"L1": "B"}, "timestamp": 330000}, - {"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000}, - {"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000}, - {"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000}, - {"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000} - ], - "outputs": [ - {"topic": "LEFT_JOIN", "key": 0, "value": {"L1": "A", "L2": "a"}, "timestamp": 60000}, - {"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": null}, "timestamp": 90000}, - {"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": "c"}, "timestamp": 90000}, - {"topic": "LEFT_JOIN", "key": 3, "value": {"L1": "D", "L2": null}, "timestamp": 60000} - ] - }, - { - "name": "stream stream full outer join with out of order and custom grace period", - "format": ["AVRO", "JSON"], - "statements": [ - "CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", - "CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", - "CREATE STREAM OUTER_JOIN as SELECT ROWKEY as ID, t.id, tt.id, l1, l2 FROM LEFT_STREAM t full outer join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;" - ], - "inputs": [ - {"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0}, - {"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000}, - {"topic": "right_topic", "key": 1, "value": {"L2": "b"}, "timestamp": 330000}, - {"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000}, - {"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000}, - {"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000}, - {"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000} - ], - "outputs": [ - {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "TT_ID": 0, "L1": "A", "L2": "a"}, "timestamp": 60000}, - {"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": null, "TT_ID": 2, "L1": null, "L2": "c"}, "timestamp": 90000}, - {"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": 2, "TT_ID": 2, "L1": "C", "L2": "c"}, "timestamp": 90000}, - {"topic": "OUTER_JOIN", "key": 3, "value": {"T_ID": null, "TT_ID": 3, "L1": null, "L2": "d"}, "timestamp": 60000}, - {"topic": "OUTER_JOIN", "key": 3, "value": {"T_ID": 3, "TT_ID": null, "L1": "D", "L2": null}, "timestamp": 60000} - ] - }, { "name": "stream stream outer join", "format": ["AVRO", "JSON"], diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index dfb6b0ee5f47..3ad79bdfa962 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -228,8 +228,8 @@ joinWindow ; withinExpression - : '(' joinWindowSize ',' joinWindowSize ')' (gracePeriodClause)? # joinWindowWithBeforeAndAfter - | joinWindowSize (gracePeriodClause)? # singleJoinWindow + : '(' joinWindowSize ',' joinWindowSize ')' # joinWindowWithBeforeAndAfter + | joinWindowSize # singleJoinWindow ; joinWindowSize diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index bcae9faed02e..e06300b3ce06 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -580,19 +580,18 @@ private static Node visitWithinExpression(final SqlBaseParser.WithinExpressionCo beforeSize = getSizeAndUnitFromJoinWindowSize(singleWithin.joinWindowSize()); afterSize = beforeSize; - gracePeriod = gracePeriodClause(singleWithin.gracePeriodClause()); + gracePeriod = Optional.empty(); } else if (ctx instanceof SqlBaseParser.JoinWindowWithBeforeAndAfterContext) { final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow = (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) ctx; beforeSize = getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)); afterSize = getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1)); - gracePeriod = gracePeriodClause(beforeAndAfterJoinWindow.gracePeriodClause()); + gracePeriod = Optional.empty(); } else { throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 " - + "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with " - + "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or " - + "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds"); + + "seconds\", or a join window with " + "before and after specified, ie. " + + "\"WITHIN (10 seconds, 20 seconds)\""); } return new WithinExpression( getLocation(ctx), diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 69d4338137af..eb985a11df7e 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -890,36 +890,6 @@ public void shouldSetWithinExpressionWithSingleWithin() { assertThat(join.getType(), is(JoinedSource.Type.INNER)); } - @Test - public void shouldSetWithinExpressionWithSingleWithinAndGracePeriod() { - final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 JOIN ORDERS WITHIN " - + "10 SECONDS GRACE PERIOD 5 SECONDS ON TEST1.col1 = ORDERS.ORDERID ;"; - - final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) - .getStatement(); - - assertThat(statement, instanceOf(CreateStreamAsSelect.class)); - - final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement; - - final Query query = createStreamAsSelect.getQuery(); - - assertThat(query.getFrom(), instanceOf(Join.class)); - - final JoinedSource join = Iterables.getOnlyElement(((Join) query.getFrom()).getRights()); - - assertTrue(join.getWithinExpression().isPresent()); - - final WithinExpression withinExpression = join.getWithinExpression().get(); - - assertThat(withinExpression.getBefore(), is(10L)); - assertThat(withinExpression.getAfter(), is(10L)); - assertThat(withinExpression.getBeforeTimeUnit(), is(TimeUnit.SECONDS)); - assertThat(withinExpression.getGrace(), - is(Optional.of(new WindowTimeClause(5, TimeUnit.SECONDS)))); - assertThat(join.getType(), is(JoinedSource.Type.INNER)); - } - @Test public void shouldSetWithinExpressionWithBeforeAndAfter() { @@ -952,38 +922,6 @@ public void shouldSetWithinExpressionWithBeforeAndAfter() { assertThat(join.getType(), is(JoinedSource.Type.INNER)); } - @Test - public void shouldSetWithinExpressionWithBeforeAndAfterAndGracePeriod() { - final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 JOIN ORDERS " - + "WITHIN (10 seconds, 20 minutes) GRACE PERIOD 10 minutes " - + "ON TEST1.col1 = ORDERS.ORDERID ;"; - - final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore) - .getStatement(); - - assertThat(statement, instanceOf(CreateStreamAsSelect.class)); - - final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement; - - final Query query = createStreamAsSelect.getQuery(); - - assertThat(query.getFrom(), instanceOf(Join.class)); - - final JoinedSource join = Iterables.getOnlyElement(((Join) query.getFrom()).getRights()); - - assertTrue(join.getWithinExpression().isPresent()); - - final WithinExpression withinExpression = join.getWithinExpression().get(); - - assertThat(withinExpression.getBefore(), is(10L)); - assertThat(withinExpression.getAfter(), is(20L)); - assertThat(withinExpression.getBeforeTimeUnit(), is(TimeUnit.SECONDS)); - assertThat(withinExpression.getAfterTimeUnit(), is(TimeUnit.MINUTES)); - assertThat(withinExpression.getGrace(), - is(Optional.of(new WindowTimeClause(10, TimeUnit.MINUTES)))); - assertThat(join.getType(), is(JoinedSource.Type.INNER)); - } - @Test public void shouldHaveInnerJoinTypeWithExplicitInnerKeyword() { final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 INNER JOIN TEST2 "