-
Notifications
You must be signed in to change notification settings - Fork 519
[flink] FlinkTableSource should implement SupportsWatermarkPushDown #2971
Description
Search before asking
- I searched in the issues and found nothing similar.
Description
Currently, FlinkTableSource does not implement the SupportsWatermarkPushDown interface. This causes two significant problems when a Fluss table has a WATERMARK definition in Flink SQL.
When SupportsWatermarkPushDown is not implemented, Flink's planner inserts a standalone WatermarkAssigner node between the table scan and downstream operators. The Flink optimization rule PushWatermarkIntoTableSourceScanRule only fires when the source implements SupportsWatermarkPushDown. Without it, the rule never fires and the WatermarkAssigner node remains as a separate operator in the query plan.
This breaks partition filter pushdown: Flink's PushFilterIntoTableSourceScanRule cannot push filter predicates past the WatermarkAssigner node into the source scan. As a result, a query like:
SELECT a, b, c FROM partitioned_table WHERE c = '2025'produces a plan where the partition filter is not pushed into the TableSourceScan:
-- Without SupportsWatermarkPushDown (current behavior):
Filter(condition=[=(c, '2025')])
WatermarkAssigner(...)
TableSourceScan(...) -- no filter, no partition pruning
-- With SupportsWatermarkPushDown (expected behavior):
TableSourceScan(
watermark=[-(ts, 5000:INTERVAL SECOND)],
watermarkEmitStrategy=[on-periodic],
filter=[=(c, '2025')] -- partition pruning applied
)
Willingness to contribute
- I'm willing to submit a PR!