Skip to content

Commit

Permalink
chore: revert GRACE PERIOD from joins (#8028)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 23, 2021
1 parent 1d5c655 commit c17ce67
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -879,28 +879,17 @@ private static String visitWithinExpression(final WithinExpressionContext contex

stringBuilder.append(String.format("%s",
anonymizeJoinWindowSize(singleWithin.joinWindowSize())));

if (singleWithin.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(singleWithin.gracePeriodClause())));
}
} else if (context instanceof JoinWindowWithBeforeAndAfterContext) {
final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow
= (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) context;

stringBuilder.append(String.format("(%s, %s)",
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)),
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1))));

if (beforeAndAfterJoinWindow.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(beforeAndAfterJoinWindow.gracePeriodClause())));
}
} else {
throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 "
+ "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with "
+ "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or "
+ "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds");
+ "seconds\", or a join window with " + "before and after specified, ie. "
+ "\"WITHIN (10 seconds, 20 seconds)\"");
}

return stringBuilder.append(' ').toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,6 @@ public void shouldAnonymizeJoinStatementsCorrectly() {
Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithBeforeAndAfterAndGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateStreamQueryCorrectly() {
final String output = anon.anonymize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,4 @@ INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;
INSERT INTO SINK SELECT * FROM SOURCE;
INSERT INTO OUTPUT SELECT * FROM SOURCE2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
DROP STREAM IF EXISTS input2;
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,4 @@ INSERT INTO stream43 SELECT column124, column126 FROM source4;
INSERT INTO stream44 SELECT * FROM source4;
INSERT INTO stream41 SELECT * FROM source3;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN ('0' SECOND, '0' SECONDS) GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
DROP STREAM IF EXISTS stream6;
Original file line number Diff line number Diff line change
Expand Up @@ -855,77 +855,6 @@
]
}
},
{
"name": "stream stream inner join with out of order and custom grace period",
"format": ["AVRO", "JSON", "PROTOBUF"],
"statements": [
"CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
"CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');",
"CREATE STREAM INNER_JOIN as SELECT t.id, l1, l2 FROM LEFT_STREAM t join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;"
],
"inputs": [
{"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0},
{"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000},
{"topic": "left_topic", "key": 1, "value": {"L1": "B"}, "timestamp": 330000},
{"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000},
{"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000},
{"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000},
{"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000}
],
"outputs": [
{"topic": "INNER_JOIN", "key": 0, "value": {"L1": "A", "L2": "a"}, "timestamp": 60000},
{"topic": "INNER_JOIN", "key": 2, "value": {"L1": "C", "L2": "c"}, "timestamp": 90000}
]
},
{
"name": "stream stream left join with out of order and custom grace period",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
"CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');",
"CREATE STREAM LEFT_JOIN as SELECT t.id, l1, l2 FROM LEFT_STREAM t left join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;"
],
"inputs": [
{"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0},
{"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000},
{"topic": "left_topic", "key": 1, "value": {"L1": "B"}, "timestamp": 330000},
{"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000},
{"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000},
{"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000},
{"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000}
],
"outputs": [
{"topic": "LEFT_JOIN", "key": 0, "value": {"L1": "A", "L2": "a"}, "timestamp": 60000},
{"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": null}, "timestamp": 90000},
{"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": "c"}, "timestamp": 90000},
{"topic": "LEFT_JOIN", "key": 3, "value": {"L1": "D", "L2": null}, "timestamp": 60000}
]
},
{
"name": "stream stream full outer join with out of order and custom grace period",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE STREAM LEFT_STREAM (ID BIGINT KEY, L1 varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}');",
"CREATE STREAM RIGHT_STREAM (ID BIGINT KEY, L2 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}');",
"CREATE STREAM OUTER_JOIN as SELECT ROWKEY as ID, t.id, tt.id, l1, l2 FROM LEFT_STREAM t full outer join RIGHT_STREAM tt WITHIN 1 minute GRACE PERIOD 1 minute on t.id = tt.id;"
],
"inputs": [
{"topic": "left_topic", "key": 0, "value": {"L1": "A"}, "timestamp": 0},
{"topic": "right_topic", "key": 0, "value": {"L2": "a"}, "timestamp": 60000},
{"topic": "right_topic", "key": 1, "value": {"L2": "b"}, "timestamp": 330000},
{"topic": "right_topic", "key": 2, "value": {"L2": "c"}, "timestamp": 90000},
{"topic": "left_topic", "key": 2, "value": {"L1": "C"}, "timestamp": 90000},
{"topic": "right_topic", "key": 3, "value": {"L2": "d"}, "timestamp": 60000},
{"topic": "left_topic", "key": 3, "value": {"L1": "D"}, "timestamp": 60000}
],
"outputs": [
{"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "TT_ID": 0, "L1": "A", "L2": "a"}, "timestamp": 60000},
{"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": null, "TT_ID": 2, "L1": null, "L2": "c"}, "timestamp": 90000},
{"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": 2, "TT_ID": 2, "L1": "C", "L2": "c"}, "timestamp": 90000},
{"topic": "OUTER_JOIN", "key": 3, "value": {"T_ID": null, "TT_ID": 3, "L1": null, "L2": "d"}, "timestamp": 60000},
{"topic": "OUTER_JOIN", "key": 3, "value": {"T_ID": 3, "TT_ID": null, "L1": "D", "L2": null}, "timestamp": 60000}
]
},
{
"name": "stream stream outer join",
"format": ["AVRO", "JSON"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ joinWindow
;

withinExpression
: '(' joinWindowSize ',' joinWindowSize ')' (gracePeriodClause)? # joinWindowWithBeforeAndAfter
| joinWindowSize (gracePeriodClause)? # singleJoinWindow
: '(' joinWindowSize ',' joinWindowSize ')' # joinWindowWithBeforeAndAfter
| joinWindowSize # singleJoinWindow
;

joinWindowSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,19 +580,18 @@ private static Node visitWithinExpression(final SqlBaseParser.WithinExpressionCo

beforeSize = getSizeAndUnitFromJoinWindowSize(singleWithin.joinWindowSize());
afterSize = beforeSize;
gracePeriod = gracePeriodClause(singleWithin.gracePeriodClause());
gracePeriod = Optional.empty();
} else if (ctx instanceof SqlBaseParser.JoinWindowWithBeforeAndAfterContext) {
final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow
= (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) ctx;

beforeSize = getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0));
afterSize = getSizeAndUnitFromJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1));
gracePeriod = gracePeriodClause(beforeAndAfterJoinWindow.gracePeriodClause());
gracePeriod = Optional.empty();
} else {
throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 "
+ "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with "
+ "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or "
+ "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds");
+ "seconds\", or a join window with " + "before and after specified, ie. "
+ "\"WITHIN (10 seconds, 20 seconds)\"");
}
return new WithinExpression(
getLocation(ctx),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,36 +890,6 @@ public void shouldSetWithinExpressionWithSingleWithin() {
assertThat(join.getType(), is(JoinedSource.Type.INNER));
}

@Test
public void shouldSetWithinExpressionWithSingleWithinAndGracePeriod() {
final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 JOIN ORDERS WITHIN "
+ "10 SECONDS GRACE PERIOD 5 SECONDS ON TEST1.col1 = ORDERS.ORDERID ;";

final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore)
.getStatement();

assertThat(statement, instanceOf(CreateStreamAsSelect.class));

final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement;

final Query query = createStreamAsSelect.getQuery();

assertThat(query.getFrom(), instanceOf(Join.class));

final JoinedSource join = Iterables.getOnlyElement(((Join) query.getFrom()).getRights());

assertTrue(join.getWithinExpression().isPresent());

final WithinExpression withinExpression = join.getWithinExpression().get();

assertThat(withinExpression.getBefore(), is(10L));
assertThat(withinExpression.getAfter(), is(10L));
assertThat(withinExpression.getBeforeTimeUnit(), is(TimeUnit.SECONDS));
assertThat(withinExpression.getGrace(),
is(Optional.of(new WindowTimeClause(5, TimeUnit.SECONDS))));
assertThat(join.getType(), is(JoinedSource.Type.INNER));
}


@Test
public void shouldSetWithinExpressionWithBeforeAndAfter() {
Expand Down Expand Up @@ -952,38 +922,6 @@ public void shouldSetWithinExpressionWithBeforeAndAfter() {
assertThat(join.getType(), is(JoinedSource.Type.INNER));
}

@Test
public void shouldSetWithinExpressionWithBeforeAndAfterAndGracePeriod() {
final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 JOIN ORDERS "
+ "WITHIN (10 seconds, 20 minutes) GRACE PERIOD 10 minutes "
+ "ON TEST1.col1 = ORDERS.ORDERID ;";

final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore)
.getStatement();

assertThat(statement, instanceOf(CreateStreamAsSelect.class));

final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement;

final Query query = createStreamAsSelect.getQuery();

assertThat(query.getFrom(), instanceOf(Join.class));

final JoinedSource join = Iterables.getOnlyElement(((Join) query.getFrom()).getRights());

assertTrue(join.getWithinExpression().isPresent());

final WithinExpression withinExpression = join.getWithinExpression().get();

assertThat(withinExpression.getBefore(), is(10L));
assertThat(withinExpression.getAfter(), is(20L));
assertThat(withinExpression.getBeforeTimeUnit(), is(TimeUnit.SECONDS));
assertThat(withinExpression.getAfterTimeUnit(), is(TimeUnit.MINUTES));
assertThat(withinExpression.getGrace(),
is(Optional.of(new WindowTimeClause(10, TimeUnit.MINUTES))));
assertThat(join.getType(), is(JoinedSource.Type.INNER));
}

@Test
public void shouldHaveInnerJoinTypeWithExplicitInnerKeyword() {
final String statementString = "CREATE STREAM foobar as SELECT * from TEST1 INNER JOIN TEST2 "
Expand Down

0 comments on commit c17ce67

Please sign in to comment.