Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33495][FLINK-33496] Add DISTRIBUTED BY clause #24155

Closed
wants to merge 20 commits into from

Conversation

jnh5y
Copy link
Contributor

@jnh5y jnh5y commented Jan 19, 2024

  • Adds DISTRIBUTED BY clause to CREATE TABLE
  • Adds Distribution support to CatalogTable
  • Add classes
  • TableDistribution
  • SupportsBucketing

Adds distribution to TableDescriptor.

This includes validation.

What is the purpose of the change

This implements part of FLIP-376. Namely it adds the Table API and SQL language support for DISTRIBUTED BY.

Brief change log

  • Adds DISTRIBUTED BY clause to CREATE TABLE
  • Adds Distribution support to CatalogTable
  • Add classes
  • TableDistribution
  • SupportsBucketing

Adds distribution to TableDescriptor.

Verifying this change

This change added tests and can be verified as follows:

Added tests to cover

  • Parsing and unparsing SQL.
  • TableDescriptor use
  • Table resolution and validation
  • MergeTableLike (CREATE TABLE LIKE)
  • Operation conversion
  • Use with the TestValues connector.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (JavaDocs)

@jnh5y
Copy link
Contributor Author

jnh5y commented Jan 19, 2024

This implements the first part of FLINK-33494 / FLIP-376.

Notably, the things left to do are:
FLINK-34172 (ALTER TABLE support)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 19, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@jnh5y jnh5y force-pushed the flink-34494 branch 3 times, most recently from d0310b7 to c5ba1ef Compare January 19, 2024 23:16
@jnh5y
Copy link
Contributor Author

jnh5y commented Jan 19, 2024

@flinkbot run azure

* Adds DISTRIBUTED BY clause to CREATE TABLE
* Adds Distribution support to CatalogTable
* Add classes
- TableDistribution
- SupportsBucketing

Adds distribution to TableDescriptor.

This includes validation.
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work @jnh5y. I added a bunch of comments. The biggest is around moving the interpretation of the algorithm strategy to a later stage (from parsing maybe to converting to CatalogTable). In the future, we might support DISTRIBUTED BY HASH(YEAR(timestamp)) which is why we shouldn't reserve HASH as a keyword but treat it as an identifier instead. This should already allow

DISTRIBUTED BY `HASH`(a, b)

but we won't go further (i.e. not parsing it as an expression).

"DRAIN"
"ENFORCED"
"ESTIMATED_COST"
"EXTENDED"
"FUNCTIONS"
"HASH"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HASH doesn't need to be a reserved keyword, it could be a identifier (similar to functions like HASH()). A later layer could check for the only two supported algorithms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I tried removing HASH as a reserved keyword.

I am hitting issues around RANGE being a keyword. I'm trying to read the distributionKind using SqlIdentifier and I haven't quite figured out how to make that work.

The FLIP didn't have the algorithm as quoted, so that's why I added HASH as a reserved keyword. Are you suggesting that algorithm (if supplied) must be quoted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that RANGE is already a reserved keyword. This of course changes the picture, we don't want to force users using backticks by default. If this is the case, I guess we have to make HASH a reserved keyword as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know the difference between nonReservedKeywords, reservedKeywords, and keywords? I'm wondering if it at least makes sense to still allow hash as a column name without backticks if we add it to the right list. We currently have too many keywords which basically forces the user to use backticks all the time.

@@ -520,6 +526,7 @@
# Please keep the keyword in alphabetical order if new keyword is added.
nonReservedKeywordsToAdd: [
# not in core, added in Flink
"BUCKETS"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is only BUCKETS added here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit that I don't have a complete understanding of the lists in Parser.tdd. Which other keywords should be added here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only add or modify code if you know what you are doing ;-)

* No properties. This only checks whether the interface is implemented again during deserialization
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("Bucketing")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this property good for? other spec don't have it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to @JsonTypeName("Bucketing") or something else?

I'll admit that I copied this class directly from FLIP-376. PartitioningSpec does have a JsonTypeName.

(If you are referring to JsonIgnoreProperties, that is in plenty of other classes. I could imagine removing it from this class since we only expect to use this as a marker interface. Thoughts?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to @JsonTypeName("Bucketing") or something else?

Yes, I also don't recall why I added it. Might be a mistake or feedback that I got during ML discussion. In any case we should only add code if we know what it is doing (and we want it that way).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, figured it out... @JsonTypeName("Bucketing") overrides using the class name.

We either get

        "type" : "BucketingSpec"

or

        "type" : "Bucketing"

In the compiled plan.

I think that the override makes sense.

@@ -334,6 +352,12 @@ public Builder format(
return this;
}

/** Define which columns this table is distributed by. */
public Builder distributedBy(CatalogTable.TableDistribution tableDistribution) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but just distributedInto as in SQL

* No properties. This only checks whether the interface is implemented again during deserialization
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("Bucketing")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to @JsonTypeName("Bucketing") or something else?

Yes, I also don't recall why I added it. Might be a mistake or feedback that I got during ML discussion. In any case we should only add code if we know what it is doing (and we want it that way).

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jnh5y for updating all the different locations.

@jnh5y
Copy link
Contributor Author

jnh5y commented Feb 5, 2024

LGTM, thanks @jnh5y for updating all the different locations.

@twalthr thanks for the review and help!

twalthr pushed a commit to twalthr/flink that referenced this pull request Feb 12, 2024
- Adds distribution support to CatalogTable
- Adds connector ability SupportsBucketing
- Adds distribution to TableDescriptor.

This closes apache#24155.
twalthr pushed a commit to twalthr/flink that referenced this pull request Feb 13, 2024
- Adds distribution support to CatalogTable
- Adds connector ability SupportsBucketing
- Adds distribution to TableDescriptor.

This closes apache#24155.
twalthr pushed a commit to twalthr/flink that referenced this pull request Feb 19, 2024
- Adds distribution support to CatalogTable
- Adds connector ability SupportsBucketing
- Adds distribution to TableDescriptor.

This closes apache#24155.
@twalthr twalthr closed this in 6c9ac5c Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants