Skip to content

Flink: add limit pushdown for IcebergTableSource#1822

Merged
openinx merged 2 commits intoapache:masterfrom
zhangjun0x01:limitPushDown
Dec 10, 2020
Merged

Flink: add limit pushdown for IcebergTableSource#1822
openinx merged 2 commits intoapache:masterfrom
zhangjun0x01:limitPushDown

Conversation

@zhangjun0x01
Copy link
Contributor

@zhangjun0x01 zhangjun0x01 commented Nov 25, 2020

related to #1816

now the source implement the interface LimitableTableSource,
after flink 1.12 ,implement the SupportsLimitPushDown

@github-actions github-actions bot added the flink label Nov 25, 2020
@zhangjun0x01 zhangjun0x01 force-pushed the limitPushDown branch 2 times, most recently from bd3efd3 to 06ae616 Compare November 26, 2020 01:41

public IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, String> properties) {
this(loader, schema, properties, null);
this(loader, schema, properties, null, false, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disable the limit push down for flink table source ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be related to the design of the LimitableTableSource interface in flink 1.11. I looked up some implement classes of LimitableTableSource in flink, such as HiveTableSource. By default, the limit pushdown is disabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the table won't have a limit cause by default, so we should set it disabled by default. It's OK here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the final modifier ? Also no need to initialize it with a default false because the constructor will always assign a given value to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if limit is -1L, then means we've disabled the limit push down , right ? If so, why do we need two fields ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we can judge whether to pushdown by the value of limit, but the LimitableTableSource interface provides two methods, isLimitPushedDown and applyLimit. From the method comments, I think the author wants to judge whether to pushdown by the isLimitPushedDown method.

In versions after flink 1.12, a new interface SupportsLimitPushDown is provided. This interface only provides one method. I think we can judge pushdown by the value of limit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the isLimitPushDown is always true, how about use the string String.format(", LimitPushDown: %d", limit) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this ? we don't parse the limit from properties, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just using the context.limit() in reachedEnd, rather than introducing another new transient field limit ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted the field limit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with -> WITH

public void testLimitPushDown() {
sql("INSERT INTO %s VALUES (1,'a'),(2,'b')", TABLE_NAME);

String querySql = String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add those test cases:

Case.1 : SELECT * FROM test LIMIT -1
Case.2 : SELECT * FROM test LIMIT 0
Case.3: SELECT * FROM test LIMIT 3 , means the limit exceeds the total rows in table .
Case.4: SELECT * FROM test WHERE a = 1 AND LIMIT 2 , query data with both limit and filters

etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the test case

fix some bugs
@openinx openinx merged commit 7645ceb into apache:master Dec 10, 2020
@openinx
Copy link
Member

openinx commented Dec 10, 2020

Merged this PR, thanks @zhangjun0x01 for contributing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments