-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common #7848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@twalthr Have created a PR to port external catalogs to flink-table-common. Could you help to take a look when it's convenient for you? Thanks in advance. :) |
bowenli86
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this PR!
Just share my thought on the annotation. I don't think we should add annotations to existing catalog related classes, given 1) they are not annotated before, 2) they will soon be replaced by new catalog APIs
| * <p>It provides information about catalogs, databases and tables such as names, schema, | ||
| * statistics, and access information. | ||
| */ | ||
| @PublicEvolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove annotation
| * | ||
| * <p>Use {@link ExternalCatalogTableBuilder} to integrate with the normalized descriptor-based API. | ||
| */ | ||
| @PublicEvolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove annotation
3dcfe1f to
7d51f47
Compare
|
@bowenli86 Thanks a lot for the suggestions. I'm fine to remove the annotation of |
|
@flinkbot approve description Should be rebased after all commits of FLINK-11449 be merged. |
|
@sunjincheng121 Thanks a lot for the review. I will rebase the PR after |
7d51f47 to
423f83a
Compare
|
@sunjincheng121 @twalthr I have rebased the PR since FLINK-11449 has been merged. Could you help to take a look at this PR? Thanks in advance. |
f966ea9 to
683b836
Compare
|
@bowenli86 thanks for your suggestion. I agree that the catalog API will change in the near future but this doesn't mean that we can't annotate it. The annotation allows users to know what is intended to be used or not. The current catalog APIs are still intended to be used. The |
|
@twalthr re: The current catalog APIs are still intended to be used. What my team is thinking of rolling out unified catalog APIs is to build them separately and not touch existing ExternalCatalog APIs, and, upon it having full capabilities of existing catalog APIs, we can just do a switch. That's how we did it internally, and of course we can discuss if you have a different rollout plan. Anyway, the above was the background I had in mind then. With that, I was concerned that japicmp may report errors when changing a class's annotation from PublicEvolving to Deprecated, thus I did some testing locally and it seems to be fine. I think it's ok to mark them as PublicEvolving. It's a bit hard to say as everything is moving around so fast in Flink table/SQL and our planned timeline. |
|
@bowenli86 Having a completely new interface for the unified catalog APIs and performing the switch later is completely fine. This is also what Xuefu and I had discussed offline. As far as I know, |
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dianfu. I added a couple of comments. My biggest concern right now is that this PR does much more than just porting external catalog interfaces but also changes rowtime descriptors and field computer interfaces. If we would skip the ExternalCatalogBuilder as discussed initially we could have avoided porting a lot of classes that might change very soon anyways due to the unified catalog API.
| * <p>The following example shows how to read from a connector using a JSON format and | ||
| * declaring it as a table source: | ||
| * | ||
| * <p>{@code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use <pre>{@code …}</pre> instead. See https://reflectoring.io/howto-format-code-snippets-in-javadoc/.
| // add cast to requested type and convert expression to RexNode | ||
| // TODO we cast to planner expressions as a temporary solution to keep the old interfaces | ||
| val rexExpression = Cast(expression.asInstanceOf[PlannerExpression], resultType) | ||
| val rexExpression = ApiExpressionUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave a TODO here as it reminds us that this is still not the final solution.
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.apache.flink</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove dependencies to flink-table-api-java-bridge or flink-table-planner already now? I think the elasticsearch module should be able to work with flink-table-common classes only, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still depends on flink-table-api-java-bridge and flink-table-planner as Elasticsearch6UpsertTableSinkFactory implements StreamTableSinkFactory.
| * @param extractor The {@link TimestampExtractor} to extract the rowtime attribute | ||
| * from the physical type. | ||
| */ | ||
| public Rowtime timestampsFromExtractor(TimestampExtractor extractor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the original logic. Before we were only serializing as the last option. Serializing is dangerous as we might not be able to restore the base64 string. Please reintroduce the original logic.
| * Sets a custom watermark strategy to be used for the rowtime attribute. | ||
| */ | ||
| public Rowtime watermarksFromStrategy(WatermarkStrategy strategy) { | ||
| internalProperties.putString( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the original logic. Before we were only serializing as the last option. Serializing is dangerous as we might not be able to restore the base64 string. Please reintroduce the original logic.
| private final DescriptorProperties internalProperties = new DescriptorProperties(true); | ||
|
|
||
| /** | ||
| * These constants will be removed once RowtimeValidator is ported to flink-table-common. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it too much effort to port the RowtimeValidator now?
| * Specifies the origin of the previously defined field. The origin field is defined by a | ||
| * connector or format. | ||
| * | ||
| * <p>E.g. field("myString", Types.STRING).from("CSV_MY_STRING") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use {@code field(...} for all code exmaples in those methods.
| * @param fieldAccesses Field access expressions for the argument fields. | ||
| * @return The expression to extract the timestamp from the {@link TableSource} return type. | ||
| */ | ||
| public abstract Expression getExpression(FieldReferenceExpression[] fieldAccesses); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this discussion with @sunjincheng121 before that this breaks the current interfaces and also changes the serialization format of serialized timestamp extractors. I'm not happy with this changes but I also see that we need progress here.
In any case, I would like to get such breaking changes communicated through the description of the PR.
If we would simply postpone the ExternalCatalogBuilder porting, most of the work of this PR would not have been necessary.
| "50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9A" + | ||
| "IAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cm" + | ||
| "F0ZWd5mB_uSxDZ8-MCAAB4cA") | ||
| "F0ZWd57RqMYVyVWhUCAAB4cA") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not break tests in "porting" pull requests.
|
@twalthr Thanks a lot for the review. Considering that this change will break the API of FieldComputer and the serialization of Rowtime, I'd like to take the original solution and avoid the changes of ExternalCatalogBuilder. What do you think? I intended to avoid removing the method ExternalCatalogTable.builder() if possible (as done in the current PR), but it seems that removing it would be a nicer solution. |
…ints to flink-table-common
683b836 to
1f5e941
Compare
|
@dianfu thanks for your feedback. Yes, I think we should always aim for small and less invasive steps. The external catalog interfaces will be deprecated anyway soon. If we change the rowtime descriptor and field computer interfaces, we should do it as part of a big table source refactoring. The most important goal is to unblock |
|
@twalthr Makes much sense to me. I have updated the PR and limit the changes to a minimal. Looking forward to your feedback. :) |
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ints to flink-table-common This closes apache#7848.
…ints to flink-table-common This closes apache#7848.
…ints to flink-table-common This closes apache#7848.
What is the purpose of the change
This pull request ported external catalog to flink-table-common
Brief change log
Verifying this change
This change is a code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation