-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-12846][table] Carry primary key and unique key information in TableSchema #8736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… TableSchema work in Blink planner
|
@godfreyhe , do you have time to have a look? |
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit dda9e2c (Wed Dec 04 15:21:18 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| Arrays.equals(fieldDataTypes, schema.fieldDataTypes); | ||
| Arrays.equals(fieldDataTypes, schema.fieldDataTypes) && | ||
| Arrays.equals(primaryKey, schema.primaryKey) && | ||
| Arrays.equals(uniqueKeys, schema.uniqueKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrays.equals doesn't "work" for two dimensional arrays. Use Arrays.deepEquals instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I also change the hashcode to Arrays.deepHashCode.
| .field("b", DataTypes.STRING()) | ||
| .field("c", DataTypes.BIGINT()); | ||
|
|
||
| String expected = "root\n |-- a: INT\n |-- b: STRING\n |-- c: BIGINT\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field type should come with nullable information.
seems we should modify DataType.toString().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is nullable, and nullable will not be print in toString. If it is not null, the toString will be INT NOT NULL.
| /** | ||
| * Tests for {@link TableSchema}. | ||
| */ | ||
| public class TableSchemaTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add equals test(especially for uniqueKeys).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 LGTM, and leave some comments.
|
Thanks @docete , I have addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong thanks for this PR, i leave some comments.
| * <p>A table schema that represents a table's structure with field names and data types and some | ||
| * constraint information (e.g. primary key, unique key).</p><br/> | ||
| * | ||
| * <p>Concepts about primary key and unique key:</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we distinguish primary key from unique key? in current javadoc, they have no difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference between primary key and unique key is that there is only one primary key and there can be more than one unique key. And a primary key doesn't need to be declared in unique key list again.
I will add this to the class Javadoc.
| throw new IllegalArgumentException("The field '" + field + | ||
| "' is not existed in the schema."); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that means we must build fieldNames first ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think so.
| if (uniqueKeys == null) { | ||
| uniqueKeys = new ArrayList<>(); | ||
| } | ||
| uniqueKeys.add(Arrays.asList(fields)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should distinct uniqueKey ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add that.
| class TableSourceTable[T]( | ||
| val tableSource: TableSource[T], | ||
| val isStreaming: Boolean, | ||
| val statistic: FlinkStatistic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the statistic is deleted, how to store the statistic from catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another scenario: rules (like PushProjectIntoTableSourceScanRule) does not change statistics , so the new TableSource created by the rule could reuse the original TableSource, and avoid to call TableSource#getTableStats method which is high cost.
so the def copy(statistic: FlinkStatistic): FlinkTable method defined in FlinkTable should not be deleted too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- the statistic from catalog should be restored via
TableSource#getTableStats(). - I think if the TableSource is not changed, then we don't need to re-construct a new
TableSourceTable. I can reuse the originalTableSourceTableand avoid callingTableSource#getTableStatsagain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableSourceshould be decoupled withCatalog, or add a method likesetTableStatsforTableSourceinterface.- The TableSource may be changed, e.g. project push down into table source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yes, maybe we need something like
setTableStats, but this is out of the scope of this issue. - If the TableSource is changed, shouldn't we always to create a new
TableSourceTableandgetTableStats()again? How do we know the stats is not changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I do not think it‘s a good idea to add
setTableStatsforTableSource, one big reason is eachTableSourcemust implement this method and default implementation does not work due to a TableSource does not know whether the catalog could provide its stats. -
yes,
TableSourceis immutable inTableSourceTable. currently, whether use the original stats or use unknown stats for new TableSourceTable is decided in each rule. you could reference this related code inPushProjectIntoTableSourceScanRuleandPushFilterIntoTableSourceScanRulein blink inner code.
| types: Array[TypeInformation[_]], | ||
| fields: Array[String], | ||
| statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table | ||
| statistic: TableStats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may add more info for FlinkStatistic in future, use FlinkStatistic instead of each fields to make sure this method and related test cases are stable. relModifiedMonotonicity is also a member of FlinkStatistic and is not defined in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the more information we need add to FlinkStatistic, I think it should also be included in TableStats. Regarding to the relModifiedMonotonicity, it is only be used internally in intermediate table source (IntermediateRelTable) which keeps FlinkStatistic as the constructor parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the problem using FlinkStatistic ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We removed the FlinkStatistic from the constructor of TableSourceTable to make the statistic deriving simple. That's why we can't build a TableSourceTable from FlinkStatistic.
| new TableSourceTable[BaseRow](tableSource, false) | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete redundant blank line
|
This PR will be splitted into several PRs. Let's continue the first one in #10213. |
What is the purpose of the change
The primary key and unique key is a standard meta information in SQL. And they are important information for optimization, for example, AggregateRemove, AggregateReduceGrouping and state layout optimization for TopN and Join.
So in this PR, we extend TableSchema to carry more information about primary key and unique keys. So that the TableSource can declare this meta information.
The primary key and unique key information will only work in Blink planner currently. Flink planner will just ignore the information.
Brief change log
This pull request contains 2 commits:
TableSchemawith some Javadocs.TableSourceTableto make it only build theFlinkStatisticfromTableSourceinstead of passing in from outside. Along with some tests changes.Verifying this change
TableSchemaTestto check the schema builder methods.FlinkRelMdUniqueKeysTest#testGetUniqueKeysOnTableSourceScantest to check the optimizer can get the unique key information fromTableSchemafromTableSource.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation