Skip to content

Conversation

@wuchong
Copy link
Member

@wuchong wuchong commented Nov 15, 2019

What is the purpose of the change

The primary key is a standard meta information in SQL. And they are important information for optimization, for example, AggregateRemove, AggregateReduceGrouping and state layout optimization for TopN and Join.

So in this PR, we extend TableSchema to carry more information about primary key.

Bridge the primary key of TableSchema to the planner metadata will be done in other PRs.

Brief change log

  1. Add primary key to TableSchema with some Javadocs.

Verifying this change

  1. Adds some tests in TableSchemaTest to check the primary key.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 15, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit f98778e (Wed Dec 04 15:23:43 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@KurtYoung
Copy link
Contributor

cc @twalthr @dawidwys

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 15, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@dawidwys dawidwys self-assigned this Nov 15, 2019
@wuchong wuchong changed the title [FLINK-12846][table-common] Carry primary key and unique key information in TableSchema [FLINK-12846][table-common] Carry primary key information in TableSchema Nov 15, 2019
@wuchong
Copy link
Member Author

wuchong commented Nov 15, 2019

I dropped unique key information, because we only need to support primary key for TPC-DS optimizations.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put some comments to the code already, but wanted to ask if those changes are part of any accepted FLIP?

I have some concerns if the concept of PRIMARY KEYS was announced/discussed publicly. I remember reading a document(I think it was from you @wuchong) about different source/sink concepts such as primary key/changeflag etc., but I could not find any references.

* <p>Concepts about primary key and unique key:</p>
* <ul>
* <li>
* Primary key and unique key can consist of single or multiple columns (fields).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Primary key and unique key can consist of single or multiple columns (fields).
* A primary key and a unique key can consist of a single or multiple columns (fields).

Comment on lines 57 to 62
* A primary key or unique key on source will be simply trusted, we won't validate the
* constraint. The primary key and unique key information will then be used for query
* optimization. If a bounded or unbounded table source defines any primary key or
* unique key, it must contain a unique value for each row of data. You cannot have
* two records having the same value of that field(s). Otherwise, the result of query
* might be wrong.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe?:
"The primary and unique keys' constraints of a source table are not validated. They are assumed to be correct and used for query optimization. If a bounded or unbounded table source defines any primary or
unique key, it must contain a unique value for each row of data. You cannot have
two records having the same value of that field(s). Otherwise, the result of the query
might be wrong."

What about retract/changelog streams? Do we just assume that changeflag + timestamp are part of every key? I understand this will be handled correctly in the runtime, but I think this should be also explained in this javadoc.

* might be wrong.
* </li>
* <li>
* A primary key or unique key on sink is a weak constraint. Currently, we won't validate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily understand the first sentence. What do you mean by weak constraint? Is it weak because we do not support that yet?

* compile time.
* </li>
* <li>
* The difference between primary key and unique key is that there can be only one primary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The difference between primary key and unique key is that there can be only one primary
* The difference between a primary key and a unique key is that there can only be one primary

* </li>
* <li>
* The difference between primary key and unique key is that there can be only one primary
* key and there can be more than one unique key. And a primary key doesn't need to be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* key and there can be more than one unique key. And a primary key doesn't need to be
* key whereas there can be more than one unique key. A primary key doesn't need to be

@Test
public void testInvalidMultiPrimaryKey() {
thrown.expectMessage(
"A primary key [a] have been defined, can not define another primary key [b]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"A primary key [a] have been defined, can not define another primary key [b]");
"A primary key [a] has already been defined. Can not define another primary key [b]");

@Test
public void testInvalidPrimaryKeyNestedFieldName() {
thrown.expectMessage(
"The primary key field 'c.q1' is not existed in the schema");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The primary key field 'c.q1' is not existed in the schema");
"The primary key field 'c.q1' does not exist in the schema");


@Test
public void testInvalidPrimaryKeyFieldName() {
thrown.expectMessage("The primary key field 'd' is not existed in the schema");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
thrown.expectMessage("The primary key field 'd' is not existed in the schema");
thrown.expectMessage("The primary key field 'd' does not exist in the schema");

/**
* Utilities for {@link org.apache.flink.table.api.TableSchema} validation.
*/
public final class TableSchemaValidation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Internal

.collect(Collectors.toMap(TableColumn::getName, Function.identity()));

// validate primary key
for (String key : primaryKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we validate fields in a key are unique? I think it does not make sense to define the same field in a key multiple times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should validate the key fields are unique.

@wuchong
Copy link
Member Author

wuchong commented Nov 17, 2019

Thanks @dawidwys for the reviewing. I agree with that the concepts of primary key on source/sink should be discussed and accepted publicly. But the background of this issue is that we would like to support more optimizations for TPC-DS where the sources have primary key information in Hive metastore. So we would like to only expose the primary key interface to CatalogTable, to let the Hive catalog pass the primary key information which is defined by Hive DDL to Flink. If a primary key is defined on Flink DDL or TableSource#getTableSchema or TableSink#getTableSchema, an unsupported exception will be thrown. In this way, I think the primary key concepts is simple and clear which is similar to Hive's definition.

But I'm not sure whether this still requires a FLIP. What do you think?

Copy link
Contributor

@sjwiesman sjwiesman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few small suggestions to the javadoc. I believe this looks good to use in future docs.

* Primary key can be consist of single or multiple columns (fields).
* </li>
* <li>
* A primary key on source will be simply trusted, we won't validate the constraint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A primary key on source will be simply trusted, we won't validate the constraint.
* A primary key on a source table will simply be trusted, we won't validate the constraint.

* A primary key on source will be simply trusted, we won't validate the constraint.
* The primary key information will then be used for query optimization. If a bounded
* or unbounded table source defines any primary key, it must contain a unique value
* for each row of data. You cannot have two records having the same value of that field(s).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* for each row of data. You cannot have two records having the same value of that field(s).
* for each row of data. If your table contains multiple rows with the same primary key values

* The primary key information will then be used for query optimization. If a bounded
* or unbounded table source defines any primary key, it must contain a unique value
* for each row of data. You cannot have two records having the same value of that field(s).
* Otherwise, the result of query might be wrong.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Otherwise, the result of query might be wrong.
* then the result of the query might be wrong.

* Otherwise, the result of query might be wrong.
* </li>
* <li>
* A primary key on sink is a weak constraint. Currently, we won't validate the constraint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A primary key on sink is a weak constraint. Currently, we won't validate the constraint,
* A primary key on sink is a weak constraint and is not validated by Flink. However,

* </li>
* <li>
* A primary key on sink is a weak constraint. Currently, we won't validate the constraint,
* but we may add some check in the future to validate whether the primary key of the query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* but we may add some check in the future to validate whether the primary key of the query
* we may add some checks in the future to validate whether the primary key of the query

@dawidwys
Copy link
Contributor

Hi @wuchong.
First of all thank you for outlining the original purpose of this change. I am a bit hesitant though to introduce a new ground concept to a core part of the Table ecosystem without a good understanding of it. That's why I would appreciate an agreement within community. I do want to emphasize I definitely want to help with finding a solution that we can introduce in 1.10 that can improve the TPC-DS performance.

I did a bit of research on the PRIMARY & UNIQUE keys. I have one concern regarding the semantics of a PRIMARY KEY. What I found out is that according to SQL standard & most of RDBM systems, PRIMARY KEY constraint enforces NOT NULL constraint on all columns that are part of the PRIMARY KEY. That's not the case for UNIQUE keys, but there it is assumed that null values are not equal regarding that constraint (you may have multiple rows having a null unique key). Unfortunately this is not reflected in Hive. In hive (at least pre 3.0) there was/is no support of NOT NULL types. Therefore PRIMARY KEY in hive is actually not a PRIMARY KEY in a SQL standard sense. I think just for that issue it would be good to bring it to the ML.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants