Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Re-enable GRACE period with new stream-stream joins semantics #8236

Merged
merged 3 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ CREATE [OR REPLACE] STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY column_name]
EMIT CHANGES;
Expand Down
5 changes: 4 additions & 1 deletion docs/developer-guide/ksqldb-reference/insert-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ Synopsis
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down
19 changes: 12 additions & 7 deletions docs/developer-guide/ksqldb-reference/quick-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY new_key_expr [, ...]]
EMIT CHANGES;
Expand Down Expand Up @@ -386,9 +387,10 @@ Stream the result of a SELECT query into an existing stream and its underlying
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down Expand Up @@ -486,7 +488,10 @@ information, see [SELECT (Push Query)](../../ksqldb-reference/select-push-query)
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression [, ...] ]
Expand Down
33 changes: 31 additions & 2 deletions docs/developer-guide/ksqldb-reference/select-push-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ keywords: ksqlDB, select, push query
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE where_condition ]
[ GROUP BY grouping_expression ]
Expand Down Expand Up @@ -208,7 +211,7 @@ SELECT windowstart, windowend, item_id, SUM(quantity)
EMIT CHANGES;
```

#### WITHIN
#### WITHIN and GRACE PERIOD

!!! note
Stream-Stream joins must have a WITHIN clause specified.
Expand All @@ -233,6 +236,32 @@ the order was placed, and shipped within 2 hours of the payment being received.
INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;
```

The GRACE PERIOD, part of the WITHIN clause, allows the join to process out-of-order records for up
to the specified grace period. Events that arrive after the grace period has passed are dropped
as _late_ records and not joined.

```sql
CREATE STREAM shipped_orders AS
SELECT
o.id as orderId
o.itemid as itemId,
s.id as shipmentId,
p.id as paymentId
FROM orders o
INNER JOIN payments p WITHIN 1 HOURS GRACE PERIOD 15 MINUTES ON p.id = o.id
INNER JOIN shipments s WITHIN 2 HOURS GRACE PERIOD 15 MINUTES ON s.id = o.id;
```

If you don't specify a grace period explicitly, the default grace period is 24 hours. This could
cause a huge amount of disk usage on high-throughput streams. Setting a specific GRACE PERIOD is
recommended to reduce high disk usage.

!!! important
If you specify a GRACE PERIOD for left/outer joins, the grace period defines when the left/outer
join result is emitted. If you don't specify a GRACE PERIOD for left/outer joins,
left/outer join results are emitted eagerly, which may cause "spurious" result records, so
we recommended that you specify a GRACE PERIOD.

#### Out-of-order events

Accept events for up to two hours after the window ends. Events that arrive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,17 +889,28 @@ private static String visitWithinExpression(final WithinExpressionContext contex

stringBuilder.append(String.format("%s",
anonymizeJoinWindowSize(singleWithin.joinWindowSize())));

if (singleWithin.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(singleWithin.gracePeriodClause())));
}
} else if (context instanceof JoinWindowWithBeforeAndAfterContext) {
final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow
= (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) context;

stringBuilder.append(String.format("(%s, %s)",
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)),
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1))));

if (beforeAndAfterJoinWindow.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(beforeAndAfterJoinWindow.gracePeriodClause())));
}
} else {
throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 "
+ "seconds\", or a join window with " + "before and after specified, ie. "
+ "\"WITHIN (10 seconds, 20 seconds)\"");
+ "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with "
+ "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or "
+ "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds");
}

return stringBuilder.append(' ').toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ public void shouldAnonymizeJoinStatementsCorrectly() {
Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithBeforeAndAfterAndGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateStreamQueryCorrectly() {
final String output = anon.anonymize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;
INSERT INTO SINK SELECT * FROM SOURCE;
INSERT INTO OUTPUT SELECT * FROM SOURCE2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
DROP STREAM IF EXISTS input2;
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ INSERT INTO stream43 SELECT column124, column126 FROM source4;
INSERT INTO stream44 SELECT * FROM source4;
INSERT INTO stream41 SELECT * FROM source3;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN ('0' SECOND, '0' SECONDS) GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
DROP STREAM IF EXISTS stream6;
Loading