Skip to content

Latest commit

 

History

History
61 lines (47 loc) · 2.54 KB

15_interval_joins.md

File metadata and controls

61 lines (47 loc) · 2.54 KB

15 Interval Joins

💡 This example will show how you can perform joins between tables with events that are related in a temporal context.

Why Interval Joins?

In a previous recipe, you learned about using regular joins in Flink SQL. This kind of join works well for some scenarios, but for others a more efficient type of join is required to keep resource utilization from growing indefinitely.

One of the ways to optimize joining operations in Flink SQL is to use interval joins. An interval join is defined by a join predicate that checks if the time attributes of the input events are within certain time constraints (i.e. a time window).

Using Interval Joins

Suppose you want to join events of two tables that correlate to each other in the order fulfillment lifecycle (orders and shipments) and that are under a Service-level Aggreement (SLA) of 3 days. To reduce the amount of input rows Flink has to retain and optimize the join operation, you can define a time constraint in the WHERE clause to bound the time on both sides to that specific interval using a BETWEEN predicate.

Script

The source tables (orders and shipments) are backed by the built-in datagen connector, which continuously generates rows in memory.

CREATE TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='sequence',
  'fields.id.start'='1',
  'fields.id.end'='1000'
);


CREATE TABLE shipments (
  id INT,
  order_id INT,
  shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='5',
  'fields.id.kind'='random',
  'fields.id.min'='0',
  'fields.order_id.kind'='sequence',
  'fields.order_id.start'='1',
  'fields.order_id.end'='1000'
);

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

Example Output

15_interval_joins