-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces #6323
Conversation
…ts from connectors This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility. This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
…tiate TableSinks This closes apache#6201.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. With this refactor generally speaking it's looking much better :) Please clean up your 6 latests commits:
- first commit
Rename to TableFactory and move it to factories package
is fine Refactor format factories
probably should be squashed with the first oneUnify table factories
,Move table type out of descriptors
andMake source/sink factories environment-dependent
are more or less fine
However the last one Clean up and simplify changes
is very strange. Especially parts that are removing code that this PR introduced couple of commits before. Changes there should be split and squashed with appropriate previous commits.
* @param schema Schema of the produced table. | ||
* @param returnType Type information of the produced physical DataStream. | ||
* @param schema Schema of the produced table. | ||
* @param proctimeAttribute Field name of the processing time attribute, null if no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
Optional
or overload or use builder or disallow null. Same applies to classes that are extending from this one.
* @param proctimeAttribute Field name of the processing time attribute, null if no | ||
* processing time field is defined. | ||
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute | ||
* @param fieldMapping Mapping for the fields of the table schema to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or null
ditto
@@ -50,7 +50,8 @@ trait DefinedFieldMapping { | |||
* type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make | |||
* fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]]. | |||
* | |||
* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields. | |||
* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or | |||
* null if no mapping is necessary. | |||
*/ | |||
def getFieldMapping: JMap[String, String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annotate Nullable
or change to Optional
.
* @param proctimeAttribute Field name of the processing time attribute, null if no | ||
* processing time field is defined. | ||
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute | ||
* @param fieldMapping Mapping for the fields of the table schema to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this field ever be null? SchemaValidator.deriveFieldMapping
doesn't allow for that, does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backward compatibility. It could have been null in the past.
startupMode == that.startupMode && | ||
Objects.equals(specificStartupOffsets, that.specificStartupOffsets); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(schema, topic, properties, returnType, | ||
proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets); | ||
return Objects.hash(schema, proctimeAttribute, rowtimeAttributeDescriptors, fieldMapping, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format one entry per line
@@ -465,14 +465,14 @@ abstract class TableEnvironment(val config: TableConfig) { | |||
tableSink: TableSink[_]): Unit | |||
|
|||
/** | |||
* Registers an external [[TableSink]] which is already configured in this | |||
* [[TableEnvironment]]'s catalog. | |||
* Registers an external [[TableSink]] with already configured field names and field types in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto?
} | ||
case t => throw TableException(s"Unknown Table type ${t.getClass}.") | ||
case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true | ||
// null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop //null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This information is useful.
|
||
class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], | ||
val tableSinkTableOpt: Option[TableSinkTable[T2]]) | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: fixup Shuyi's commit
|
||
csvTableSinkBuilder | ||
.build() | ||
val path = params.getString(CONNECTOR_PATH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto fixup
@@ -186,7 +186,7 @@ class TableEnvironmentITCase( | |||
def testInsertIntoMemoryTable(): Unit = { | |||
val env = ExecutionEnvironment.getExecutionEnvironment | |||
val tEnv = TableEnvironment.getTableEnvironment(env) | |||
MemoryTableSourceSinkUtil.clear | |||
MemoryTableSourceSinkUtil.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, and same applies probably to other changes in this commit :(
Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging. |
b9e0e59
to
a4785a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple minor comments, thanks for your effort! LGTM
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { | |||
} | |||
|
|||
public Source toSource() { | |||
final Map<String, String> newProperties = new HashMap<>(properties); | |||
newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), | |||
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just asking because I can not find what code has replaced them. Or there were some dead code?
* An empty context means that the factory matches for all requests. | ||
*/ | ||
def requiredContext(): util.Map[String, String] | ||
trait TableFormatFactory[T] extends TableFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in that case deduplicate comment with @See
java doc pointer? Otherwise there is huge risk of comments drifting out of sink.
* @return the matching factory | ||
*/ | ||
def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { | ||
Preconditions.checkNotNull(factoryClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this leads to quite some unnecessary code duplication. checkNotNull(factoryClass)
appears 5 times here and same applies to other params. Doing the check only in the place where you are touching value would either solve or at least limit this issue.
val plainContext = mutable.Map[String, String]() | ||
plainContext ++= requestedContext | ||
// we remove the version for now until we have the first backwards compatibility case | ||
// with the version we can provide mappings in case the format changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a follow up issue with blocker status for 1.6.0
so that it doesn't slip?
Usually it is very uncommon to define both a batch and streaming source in the same factory. Separating by environment is a concept that can be find throughout the entire flink-table module because both sources and sinks behave quite different per environment. This closes apache#6323.
Thanks @pnowojski. Merging this... |
Usually it is very uncommon to define both a batch and streaming source in the same factory. Separating by environment is a concept that can be find throughout the entire flink-table module because both sources and sinks behave quite different per environment. This closes apache#6323.
What is the purpose of the change
This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories.
Brief change log
org.apache.table.factories.TableFactory
a common interface for factoriesorg.apache.table.factories.TableFormatFactory
a specific table factory for formatsStreamTableSource
,StreamTableSink
,BatchTableSource
,BatchTableSink
,DeserializationSchema
, andSerializationSchema
both
in SQL Client YAML)Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation