-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12604][table-api][table-planner] Register TableSource/Sink as CatalogTables #8549
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
TableOperations. We do not store the DataStream & DataSet as Calcite's Tables anymore. We treat them as inline operations. When converting from TableOperations to RelNodes we directly create a special kind of DataStream/SetScan that does not access the catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for 7192f70
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <ol> | ||
* <li>{@code [current-catalog].[current-database].[tablePath]}</li> | ||
* <li>{@code [current-catalog].[tablePath]}</li> | ||
* <li>{@code [tablePath]}</li> | ||
* </ol> | ||
* | ||
* @param tablePath table path to look for | ||
* @return {@link CatalogTableOperation} containing both fully qualified table identifier and its | ||
* {@link TableSchema}. | ||
* @return {@link ResolvedTable} wrapping original table with additional iformation about table path and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: information
private final TableSink<T2> tableSink; | ||
private final boolean isBatch; | ||
|
||
private static final String COMMENT = "A table sink or source backed table."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: baked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant "to back" that this Table is backed by a table sink or sourced.
Shall I maybe just remove the comment, the same way as we discussed here: #8521 (comment)
} | ||
|
||
@Override | ||
public CatalogBaseTable copy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This violates the contract of the method. Is it actually required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it is. All methods in the Catalog (add, get, alter etc.) call this method. I also don't like it.
tableSource, | ||
!connectorTable.isBatch(), | ||
FlinkStatistic.UNKNOWN())) | ||
.orElseThrow(() -> new TableException("Querying sink only table unsupported.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rephrase: "Catalog table does only support sink operations."?
@@ -425,20 +424,128 @@ abstract class TableEnvImpl( | |||
"Only tables that belong to this TableEnvironment can be registered.") | |||
} | |||
|
|||
checkValidTableName(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add least at some checks for non-empty strings and not only whitespaces? Right now registerTableInternal
fails in the check arguments of ObjectPath
.
val selectedFields: Option[Array[Int]]) | ||
extends TableScan(cluster, traitSet, table) { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line
@@ -156,17 +156,6 @@ public void testTableRegister() throws Exception { | |||
compareResultAsText(results, expected); | |||
} | |||
|
|||
@Test(expected = TableException.class) | |||
public void testIllegalName() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned earlier, let's keep this test but modify it with other illegal names.
FlinkStatistic.UNKNOWN()); | ||
|
||
CatalogReader catalogReader = (CatalogReader) relBuilder.getRelOptSchema(); | ||
String refId = Integer.toString(System.identityHashCode(tableSourceTable.getTableSource())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an explanation what we are doing here? Btw should we only use the refId or should we prefix it for readability unregistered_456789
?
/** | ||
* A {@link CatalogTable} that wraps a {@link TableSource} and/or {@link TableSink}. | ||
* This allows registering those in a {@link Catalog}. It can not be persisted as the | ||
* source and/or sink might be inline implementations and not be representable in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit confused be the "might be" here.
Are some ConnectorCatalogTable
s inline impl, and some are not and can be converted to properties? The exception thrown in toProperties()
indicates that all ConnectorCatalogTable
cannot be converted to properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It says the TableSource
might be a inline implementation. You can either use TableEnvironment#connect
than theoretically the table source is property serializable. The other possibility is just to use TableSource
explicitly: TableEnvironment#fromTableSource
.
The ConnectorCatalogTable
therefore is always inline as we don't know which case we are handling.
public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable { | ||
private final TableSource<T1> tableSource; | ||
private final TableSink<T2> tableSink; | ||
private final boolean isBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just want to double check: unlike a persistent catalog table that can be both batch and streaming, ConnectorCatalogTable
can only be either batch or streaming, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long-term this property will not be necessary anymore. However, it is required in how the table environments (batch and streaming ones) work.
* {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. | ||
*/ | ||
@Internal | ||
public class DataSetTableOperation<E> extends TableOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc for ?
@Internal | ||
public class DataSetTableOperation<E> extends TableOperation { | ||
|
||
private final DataSet<E> dataStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a special reason that we name the variable as dataStream instead of dataSet?
* {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. | ||
*/ | ||
@Internal | ||
public class DataStreamTableOperation<E> extends TableOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc for ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @dawidwys. +1 from my side.
What is the purpose of the change
This is next step in decoupling
TableEnvironment
from Calcite. It introduces registration ofTableSource/Sink
asCatalogTable
s.Brief change log
This is based on #8521.
org.apache.flink.table.catalog.ConnectorCatalogTable
that wrapsTableSource/Sink
and used it for registration inTableEnvironment
org.apache.flink.table.operations.TableSourceTableOperation
for reading from an inlineTableSource
. This is used only when creating aTable
fromTableEnvironment#fromTableSource
TableSourceSinkTable
,TableSourceTable
,TableSinkTable
etc.Verifying this change
TableEnviroment#fromTableSource
behavior:org.apache.flink.table.runtime.stream.table.TableSourceITCase#testInlineCsvTableSource
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation