Skip to content

Commit

Permalink
Revert GRACE keyword in Joins (#8020) (#8027)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Peña <sergio@confluent.io>
  • Loading branch information
JimGalasyn and spena committed Aug 19, 2021
1 parent 3f90bb9 commit dac6e38
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ CREATE [OR REPLACE] STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria]*
[ WHERE condition ]
[PARTITION BY column_name]
EMIT CHANGES;
Expand Down
5 changes: 1 addition & 4 deletions docs/developer-guide/ksqldb-reference/insert-into.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ Synopsis
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down
19 changes: 7 additions & 12 deletions docs/developer-guide/ksqldb-reference/quick-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,9 @@ CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY new_key_expr [, ...]]
EMIT CHANGES;
Expand Down Expand Up @@ -387,10 +386,9 @@ Stream the result of a SELECT query into an existing stream and its underlying
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
Expand Down Expand Up @@ -488,10 +486,7 @@ information, see [SELECT (Push Query)](../../ksqldb-reference/select-push-query)
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression [, ...] ]
Expand Down
33 changes: 2 additions & 31 deletions docs/developer-guide/ksqldb-reference/select-push-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ keywords: ksqlDB, select, push query
```sql
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[[ LEFT | FULL | INNER ] JOIN join_item ON [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] join_criteria]*
[ WINDOW window_expression ]
[ WHERE where_condition ]
[ GROUP BY grouping_expression ]
Expand Down Expand Up @@ -211,7 +208,7 @@ SELECT windowstart, windowend, item_id, SUM(quantity)
EMIT CHANGES;
```

#### WITHIN and GRACE PERIOD
#### WITHIN

!!! note
Stream-Stream joins must have a WITHIN clause specified.
Expand All @@ -236,32 +233,6 @@ the order was placed, and shipped within 2 hours of the payment being received.
INNER JOIN shipments s WITHIN 2 HOURS ON s.id = o.id;
```

The GRACE PERIOD, part of the WITHIN clause, allows the join to process out-of-order records for up
to the specified grace period. Events that arrive after the grace period has passed are dropped
as _late_ records and not joined.

```sql
CREATE STREAM shipped_orders AS
SELECT
o.id as orderId
o.itemid as itemId,
s.id as shipmentId,
p.id as paymentId
FROM orders o
INNER JOIN payments p WITHIN 1 HOURS GRACE PERIOD 15 MINUTES ON p.id = o.id
INNER JOIN shipments s WITHIN 2 HOURS GRACE PERIOD 15 MINUTES ON s.id = o.id;
```

If you don't specify a grace period explicitly, the default grace period is 24 hours. This could
cause a huge amount of disk usage on high-throughput streams. Setting a specific GRACE PERIOD is
recommended to reduce high disk usage.

!!! important
If you specify a GRACE PERIOD for left/outer joins, the grace period defines when the left/outer
join result is emitted. If you don't specify a GRACE PERIOD for left/outer joins,
left/outer join results are emitted eagerly, which may cause "spurious" result records, so
we recommended that you specify a GRACE PERIOD.

#### Out-of-order events

Accept events for up to two hours after the window ends. Events that arrive
Expand Down

0 comments on commit dac6e38

Please sign in to comment.