Skip to content

Commit

Permalink
feat: Re-enable GRACE period with new stream-stream joins semantics (#…
Browse files Browse the repository at this point in the history
…8236)

Re-enable stream-stream left/outer joins fixes again.

* Revert "Revert GRACE keyword in Joins (#8020) (#8027)"
This reverts commit dac6e38.

* Revert "chore: revert GRACE PERIOD from joins (#8028)"
This reverts commit c17ce67.

* Revert "chore: revert call to ofTimeAndGracePeriod() (#8047)"
This reverts commit 4c54980.
  • Loading branch information
spena committed Oct 7, 2021
1 parent df25d2b commit f640f5e
Show file tree
Hide file tree
Showing 35 changed files with 4,086 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ CREATE [OR REPLACE] STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY column_name]
EMIT CHANGES;
Expand Down
5 changes: 4 additions & 1 deletion docs/developer-guide/ksqldb-reference/insert-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ Synopsis
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down
19 changes: 12 additions & 7 deletions docs/developer-guide/ksqldb-reference/quick-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY new_key_expr [, ...]]
EMIT CHANGES;
Expand Down Expand Up @@ -386,9 +387,10 @@ Stream the result of a SELECT query into an existing stream and its underlying
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down Expand Up @@ -486,7 +488,10 @@ information, see [SELECT (Push Query)](../../ksqldb-reference/select-push-query)
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression [, ...] ]
Expand Down
33 changes: 31 additions & 2 deletions docs/developer-guide/ksqldb-reference/select-push-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ keywords: ksqlDB, select, push query
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE where_condition ]
[ GROUP BY grouping_expression ]
Expand Down Expand Up @@ -208,7 +211,7 @@ SELECT windowstart, windowend, item_id, SUM(quantity)
EMIT CHANGES;
```

#### WITHIN
#### WITHIN and GRACE PERIOD

!!! note
Stream-Stream joins must have a WITHIN clause specified.
Expand All @@ -233,6 +236,32 @@ the order was placed, and shipped within 2 hours of the payment being received.
INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;
```

The GRACE PERIOD, part of the WITHIN clause, allows the join to process out-of-order records for up
to the specified grace period. Events that arrive after the grace period has passed are dropped
as _late_ records and not joined.

```sql
CREATE STREAM shipped_orders AS
SELECT
o.id as orderId
o.itemid as itemId,
s.id as shipmentId,
p.id as paymentId
FROM orders o
INNER JOIN payments p WITHIN 1 HOURS GRACE PERIOD 15 MINUTES ON p.id = o.id
INNER JOIN shipments s WITHIN 2 HOURS GRACE PERIOD 15 MINUTES ON s.id = o.id;
```

If you don't specify a grace period explicitly, the default grace period is 24 hours. This could
cause a huge amount of disk usage on high-throughput streams. Setting a specific GRACE PERIOD is
recommended to reduce high disk usage.

!!! important
If you specify a GRACE PERIOD for left/outer joins, the grace period defines when the left/outer
join result is emitted. If you don't specify a GRACE PERIOD for left/outer joins,
left/outer join results are emitted eagerly, which may cause "spurious" result records, so
we recommended that you specify a GRACE PERIOD.

#### Out-of-order events

Accept events for up to two hours after the window ends. Events that arrive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,17 +889,28 @@ private static String visitWithinExpression(final WithinExpressionContext contex

stringBuilder.append(String.format("%s",
anonymizeJoinWindowSize(singleWithin.joinWindowSize())));

if (singleWithin.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(singleWithin.gracePeriodClause())));
}
} else if (context instanceof JoinWindowWithBeforeAndAfterContext) {
final SqlBaseParser.JoinWindowWithBeforeAndAfterContext beforeAndAfterJoinWindow
= (SqlBaseParser.JoinWindowWithBeforeAndAfterContext) context;

stringBuilder.append(String.format("(%s, %s)",
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(0)),
anonymizeJoinWindowSize(beforeAndAfterJoinWindow.joinWindowSize(1))));

if (beforeAndAfterJoinWindow.gracePeriodClause() != null) {
stringBuilder.append(String.format(" GRACE PERIOD %s",
anonymizeGracePeriod(beforeAndAfterJoinWindow.gracePeriodClause())));
}
} else {
throw new RuntimeException("Expecting either a single join window, ie \"WITHIN 10 "
+ "seconds\", or a join window with " + "before and after specified, ie. "
+ "\"WITHIN (10 seconds, 20 seconds)\"");
+ "seconds\" or \"WITHIN 10 seconds GRACE PERIOD 2 seconds\", or a join window with "
+ "before and after specified, ie. \"WITHIN (10 seconds, 20 seconds)\" or "
+ "WITHIN (10 seconds, 20 seconds) GRACE PERIOD 5 seconds");
}

return stringBuilder.append(' ').toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ public void shouldAnonymizeJoinStatementsCorrectly() {
Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeJoinWithBeforeAndAfterAndGraceStatementsCorrectly() {
final String output = anon.anonymize("INSERT INTO OUTPUT SELECT col1, col2, col3"
+ " FROM SOURCE1 S1 JOIN SOURCE2 S2 "
+ "WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON col1.k=col2.k;");

Approvals.verify(output);
}

@Test
public void shouldAnonymizeCreateStreamQueryCorrectly() {
final String output = anon.anonymize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;
INSERT INTO SINK SELECT * FROM SOURCE;
INSERT INTO OUTPUT SELECT * FROM SOURCE2;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN (1 SECOND, 3 SECONDS) GRACE PERIOD 2 SECONDS ON S1.K = S2.K;
DROP STREAM IF EXISTS input2;
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ INSERT INTO stream43 SELECT column124, column126 FROM source4;
INSERT INTO stream44 SELECT * FROM source4;
INSERT INTO stream41 SELECT * FROM source3;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN '0' SECOND GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
INSERT INTO stream41 SELECT column122, column128, column129 FROM source2 INNER JOIN source3 WITHIN ('0' SECOND, '0' SECONDS) GRACE PERIOD '0' SECONDS ON anonKey1=anonKey2;
DROP STREAM IF EXISTS stream6;
Loading

0 comments on commit f640f5e

Please sign in to comment.