-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17887][table][connector] Improve interface of ScanFormatFactory and SinkFormatFactory #12320
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ea7a259 (Mon May 25 12:44:15 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
When I writing the code, I have a feeling that maybe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @wuchong. I like many of the refactorings around the naming and unification of DynamicTableSource
and DynamicTableSink
. However, I think we should have some hierarchy in the format interfaces. It is very likely that we will have more than one format in the future. In theory each connector could define its own kind of format interface. Those will have the same createXX(..)
signature but different runtime interface. Having methods like discoverDeserializationProvider, discoverOptionalDeserializationProvider
in the util are too specific (also seen by the name). Instead, we should parameterize these utility methods with the factory people are looking for. We should just have an upper SourceFormat
and SinkFormat
interface.
Hi @twalthr , I can see the benefit to have a hierarchy in the format interfaces. What do you think about to use the first proposal in our offline discussion? That |
+1 for |
3153508
to
1ad6eea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I just found a couple of formatting issues after the refactoring
return new Kafka011DynamicSink( | ||
consumedDataType, | ||
topic, | ||
properties, | ||
partitioner, | ||
sinkFormat); | ||
encodingFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix wrong indention here and at other locations
this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null."); | ||
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); | ||
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); | ||
this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null."); | ||
this.sinkFormat = Preconditions.checkNotNull(sinkFormat, "Sink format must not be null."); | ||
this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Sink format must not be null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update error message?
@@ -85,7 +85,7 @@ | |||
* @param outputDataType Source produced data type | |||
* @param topic Kafka topic to consume. | |||
* @param properties Properties for the Kafka consumer. | |||
* @param scanFormat Scan format for decoding records from Kafka. | |||
* @param decodingFormat Scan format for decoding records from Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix wrong indention and remove "Scan" from Javadoc?
import org.apache.flink.table.types.DataType; | ||
|
||
/** | ||
* A {@link Format} for a {@link ScanTableSource}. | ||
* A {@link Format} for a {@link DynamicTableSource} to reading rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for reading rows
@@ -34,15 +34,15 @@ | |||
* | |||
* <p>Formats can be distinguished along two dimensions: | |||
* <ul> | |||
* <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}). | |||
* <li>Context in which the format is applied (e.g. {@link DynamicTableSource} or {@link DynamicTableSink}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove the e.g.
it is only for those two locations now
public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat; | ||
public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat; | ||
public final @Nullable | ||
DecodingFormat<DeserializationSchema<RowData>> sourceKeyFormat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix formatting here and below
…y and SinkFormatFactory 1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext 2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context 3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder() 4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder() 5. Rename ScanFormatFactory to DecodingFormatFactory 6. Rename SinkFormatFactory to EncodingFormatFactory
80bfdea
to
29cda13
Compare
…y and SinkFormatFactory We improved the interfaces with the following changes: 1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext 2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context 3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder() 4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder() 5. Rename ScanFormatFactory to DecodingFormatFactory 6. Rename SinkFormatFactory to EncodingFormatFactory This closes apache#12320
…y and SinkFormatFactory We improved the interfaces with the following changes: 1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext 2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context 3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder() 4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder() 5. Rename ScanFormatFactory to DecodingFormatFactory 6. Rename SinkFormatFactory to EncodingFormatFactory This closes apache#12320
…y and SinkFormatFactory We improved the interfaces with the following changes: 1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext 2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context 3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder() 4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder() 5. Rename ScanFormatFactory to DecodingFormatFactory 6. Rename SinkFormatFactory to EncodingFormatFactory This closes apache#12320
What is the purpose of the change
This pull request is for improving current ScanForamtFactory and SinkFormatFactory interfaces with the following problems:
Brief change log
Verifying this change
This change is a trivial rework and is covered by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation