-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8558] [table] Add unified format interfaces and separate formats from connectors #6264
Conversation
…ts from connectors This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility. This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this one is too big to review all at once. I reviewed roughly until BatchTableEnvironment.scala
.
String proctimeAttribute, | ||
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, | ||
Map<String, String> fieldMapping, | ||
String topic, Properties properties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line between topic
& properties
|
||
/** | ||
* Creates a Kafka 0.10 {@link StreamTableSource}. | ||
* | ||
* @param topic Kafka topic to consume. | ||
* @param properties Properties for the Kafka consumer. | ||
* @param deserializationSchema Deserialization schema to use for Kafka records. | ||
* @param typeInfo Type information describing the result type. The field names are used | ||
* to parse the JSON file and so are the types. | ||
* @param typeInfo Not relevant anymore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not relevant? Doesn't it brake the backward compatibility? If so, it would be safer to drop the constructor altogether.
The only reason for keeping it would be if ALL old invocations will still work the same as they used to, regardless of this value. However in that case, I would be also inclined to drop this constructor, since it's easy change for the users and the class was @PublicEvolving
and now it's @Internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typeInfo
should have been the same as the produced type that has been returned by the deserializationSchema
. I'm also fine with dropping the constructor already.
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about Deprecating
PublicEvolving
classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublicEvolving
is not Public
so we can modify them if necessary. Additionally, this class should never have gotten this annotation as we do not guarantee API stability for the Table API at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that having both annotation looks weird. I will remove the PublicEvolving
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we talked offline. I wasn't sure if we could drop PublicEvolving
classes instead of deprecating them. But if you want to deprecate them for one release, it might be better idea to do so.
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm why do you deprecate individual methods after already deprecating whole class? Does it solve some problem?
If not I would revert those additional @Deprecated
notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not harm to deprecate the methods as well for better visibility:
https://stackoverflow.com/questions/17615019/class-with-deprecated-annotation-means-that-all-methods-and-fields-automaticall
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I assumed that it works that way as you referred and it seemed to me that deprecating the class should suffice. But you are right, it doesn't harm (except of larger commit to review)
@@ -55,50 +57,101 @@ | |||
*/ | |||
@Internal | |||
public abstract class KafkaTableSource | |||
implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes { | |||
implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: split interfaces into new lines?
if (!properties.contains(key)) { | ||
if (!isOptional) { | ||
throw new ValidationException(s"Could not find required property '$key'.") | ||
} | ||
} else { | ||
TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions | ||
// throws validation exceptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the purpose of this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That we don't validate the string but let the parser do the work for us. I updated the comment.
* @tparam T factory class type | ||
* @return configured instance from factory | ||
*/ | ||
def find[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split this method into smaller ones. Basically everywhere where you typed a comment
//foo bar baz
some;
code;
block;
replace the comment with method call:
def fooBarBaz() {
some;
code;
block;
}
|
||
private def normalizeContext(factory: TableFormatFactory[_]): Map[String, String] = { | ||
val requiredContextJava = factory.requiredContext() | ||
if (requiredContextJava != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as below
* Creates a Kafka 0.10 {@link StreamTableSource}. | ||
* | ||
* @param schema Schema of the produced table. | ||
* @param proctimeAttribute Field name of the processing time attribute, null if no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace null with Optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of controversial topic. Generally speaking I suspect that Java discourage to use Optional
beside return values because we should use @Nullable
or not use any of them. However in projects that ignored @Nullable
annotation (such as Flink), it's virtually impossible to start using them and thus using Optional
is the only way to have a compiler control over optional/nullable fields.
In this particular use case of "optional" arguments my preference hierarchy is:
- provide a builder for this class
- provide alternative constructor without this argument
- use
@Nullable
with enabled compile errors on incorrectly handled@Nullable
annotations - use
Optional
... - use
@Nullable
WITHOUT compile errors on incorrectly handled@Nullable
annotations - use nullable argument without
@Nullable
annotation
two last options are for me out of the question, since 1337
is evil and 1336
doesn't improve situation. Option 3
is sadly impossible for Flink.
The same logic applies for me to other use cases (like fields, return values etc):
- avoid nulls/optionals (for example via builders or named parameters with default values)
- use
@Nullable
with compiler errors - use
Optional
private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors; | ||
|
||
/** Mapping for the fields of the table schema to fields of the physical returned type or null. */ | ||
private Map<String, String> fieldMapping; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional<Map<...>> fieldMapping
. Same goes for other nullable fields/parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a @Nullable
annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check my other comment about that. @Nullable
without compiler errors is not in any way better :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again only partial review :( I didn't manage to look into Feedback addressed
commit. However I managed to more or less fully review first commit.
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I assumed that it works that way as you referred and it seemed to me that deprecating the class should suffice. But you are right, it doesn't harm (except of larger commit to review)
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we talked offline. I wasn't sure if we could drop PublicEvolving
classes instead of deprecating them. But if you want to deprecate them for one release, it might be better idea to do so.
* Creates a Kafka 0.10 {@link StreamTableSource}. | ||
* | ||
* @param schema Schema of the produced table. | ||
* @param proctimeAttribute Field name of the processing time attribute, null if no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of controversial topic. Generally speaking I suspect that Java discourage to use Optional
beside return values because we should use @Nullable
or not use any of them. However in projects that ignored @Nullable
annotation (such as Flink), it's virtually impossible to start using them and thus using Optional
is the only way to have a compiler control over optional/nullable fields.
In this particular use case of "optional" arguments my preference hierarchy is:
- provide a builder for this class
- provide alternative constructor without this argument
- use
@Nullable
with enabled compile errors on incorrectly handled@Nullable
annotations - use
Optional
... - use
@Nullable
WITHOUT compile errors on incorrectly handled@Nullable
annotations - use nullable argument without
@Nullable
annotation
two last options are for me out of the question, since 1337
is evil and 1336
doesn't improve situation. Option 3
is sadly impossible for Flink.
The same logic applies for me to other use cases (like fields, return values etc):
- avoid nulls/optionals (for example via builders or named parameters with default values)
- use
@Nullable
with compiler errors - use
Optional
private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors; | ||
|
||
/** Mapping for the fields of the table schema to fields of the physical returned type or null. */ | ||
private Map<String, String> fieldMapping; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check my other comment about that. @Nullable
without compiler errors is not in any way better :(
// prepare parameters for Kafka table source | ||
|
||
final TableSchema schema = TableSchema.builder() | ||
.field("fruit-name", Types.STRING()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benefit is deduplication of values like "fruit-name"
within a single test (easier to change such constants) + self documenting code. Using constants documents all of those occurrences that are indeed the same.
Both help when debugging, refactoring and extending the test in the future.
} | ||
|
||
@Test(expected = classOf[AmbiguousTableFormatException]) | ||
def testAmbiguousFactory(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this differ from testAmbiguousSchemaBasedSelection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment to the other test.
* | ||
* An empty context means that the factory matches for all requests. | ||
*/ | ||
def requiredContext(): util.Map[String, String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand Context
in this context. Rename to requiredProperties
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context defines the "context" in which the factory will be activated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still do not understand this name. Could you think about something more descriptive? It seems to me like this method returns the set of properties that are required to match given factory. Thus requiredProperties
seems better, but maybe I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context is more accurate as the map also contains properties that are not required. E.g. a user does not have to specify property-version
in YAML but he can. The required context explains the context for which this factory was implemented for. The factory service decides how to handle the context information.
|
||
override def requiredContext(): util.Map[String, String] = { | ||
val context = new util.HashMap[String, String]() | ||
context.put("format.type", "test-format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should extract or reuse already extracted constants for all of those strings like format.type
format.path
format.important
etc. Generally speaking if a constant occurs more then once in code, it should be extracted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, you are right. The problem of having a static variable containing a property key is however that you can break backwards compatibility and all tests automatically succeed because they all referenced the common variable.
/** | ||
* Table source factory for testing with a wildcard format ("format.*"). | ||
*/ | ||
class TestWildcardFormatTableFactory extends TableSourceFactory[Row] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this class used?
props.put(FORMAT_TYPE, "test") | ||
props.put("format.type", "not-test") | ||
props.put("format.not-test-property", "wildcard-property") | ||
TableSourceFactoryService.findAndCreateTableSource(props.toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert that TestWildcardFormatTableFactory
was found?
* | ||
* An empty context means that the factory matches for all requests. | ||
*/ | ||
def requiredContext(): util.Map[String, String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still do not understand this name. Could you think about something more descriptive? It seems to me like this method returns the set of properties that are required to match given factory. Thus requiredProperties
seems better, but maybe I'm missing something?
if (requiredContextJava != null) { | ||
requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap | ||
} else { | ||
Map[String, String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNull(requiredContextJava)
? The interface doesn't seem to allow for nulls.
Thanks for the in-depth review @pnowojski. I hope I could address most of your comments. Since this PR heavily overlaps with #6201 and that PR needs a review and some additional work as well. I will close this PR for now and open a PR with a clean unified table sources/sinks/formats story. We can continue the discussions here and I will make sure that changes will be applied to new PR as well. |
What is the purpose of the change
This PR introduces a format discovery mechanism based on Java Service Providers. The general
TableFormatFactory
is similar to the existing table source discovery mechanism. However, it allows for arbitrary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances ofDeserializationSchema
andSerializationSchema
. In the future we can add interfaces such as aWriter
orKeyedSerializationSchema
without breaking backwards compatibility.This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
Brief change log
TableFormatService
withTableFormatFactory
and specificDeserializationSchemaFactory
andSerializationSchemaFactory
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation