-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-14381][table] Partition field names should be got from CatalogTable instead of source/sink #9909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 6a82d9f (Wed Dec 04 14:47:23 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your PR. I leave some comments.
btw: the last 3 commit's have wrong commit msg (table-planner-planner -> table-planner-blink).
...lanner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
Show resolved
Hide resolved
...lanner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
Show resolved
Hide resolved
...table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
Show resolved
Hide resolved
...rc/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
Show resolved
Hide resolved
f20e67a
to
3c1480f
Compare
Thanks for your review, updated. |
3c1480f
to
e3ba6b1
Compare
LGTM now! |
@wuchong @KurtYoung Can you take a look? |
For |
In |
But consider the future needs, we need get some other informations from catalog table or catalog, we don't need to pass all informations to |
Consider a temporal partition table, I'm not sure such table would stored also in Another solution is we can pass in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling is we should pass in all the required information on CatalogTable
to LogicalSink
,
other information we can access them via Catalog. I think this is also the way for source, i.e. watermark, computed column, partition.
...link-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java
Show resolved
Hide resolved
According to #9971 , temporal table is stored in
|
If you want, you can pass both table identifier as well as |
Hi @KurtYoung @wuchong , can you explain more? Why we want to reduce unnecessary access to catalog manager? |
3 minor reasons, none of them are critical but kind of bothers me:
|
Thanks @KurtYoung to explain it. none of them are critical but can convince me, I'll update it. But I think I can keep the pass of identifier and catalog manager, what do you think? |
You mean catalog table, not catalog manager, right?
You mean catalog table, not catalog manager, right? |
I mean pass identifier and catalog table to logical sink and logical source, and keep catalog manager in FlinkContext. |
I'm ok with table identifier and catalog table, but not introduce catalog manager to FlinkContext yet. You don't need it in this PR, right? |
OK, I'll remove catalog manager and identifier. |
e3ba6b1
to
e5c0586
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think i might have found some logic flaws with the old design. Let me know whether it make sense to you.
isStreamingMode, | ||
FlinkStatistic.builder().tableStats(tableStats).build()); | ||
FlinkStatistic.builder().tableStats(tableStats).build(), | ||
null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we also have CatalogTable
for such TableSource wrapped table in the future? If not, I would suggest to change CatalogTable
to Opion[CatalogTable]
in TableSourceTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I have some concern to put the whole CatalogTable
in TableSourceTable
. Because in CatalogTable, it may contains all the computed columns defined in DDL. But not all of them are retained in source, some of them may be applied to the following LogicalProject
. Could we only put the information we need in TabeSourceTable
and TableSinkTable
? e.g. List<String> partitionKeys
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say let's use the information from CatalogTable more carefully then... I can imagine partition keys are just a starter, we might need more and more information from CatalogTable in the future. CatalogTable is some kind of meta information about the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @KurtYoung , After temporary table support, every table should have CatalogTable
. I think we can keep not Option.
Hi @wuchong , I agree with kurt, Actually, source may also need to know compute columns. It needs to know which fields it does not read. (except compute columns expressions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source may also need to know compute columns. It needs to know which fields it does not read. (except compute columns expressions)
Actually, we will retain some expression in source (for rowtime generation), then we have to have another field in TableSourceTable
to keep this information. say generatedExpressions
, this might be confused with the expressions in CatalogTable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Jark, I don't quite understand the expression in this source, but I think we need a good name to clarify it. It's may not just the TableSourceTable
that saves them together? I think they are all attributes of table.
...ble-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
Outdated
Show resolved
Hide resolved
getTableSink(identifier).map(sink => { | ||
TableSinkUtils.validateSink(catalogSink, identifier, sink) | ||
val partKeys = | ||
catalogManager.getTable(identifier).get().asInstanceOf[CatalogTable].getPartitionKeys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check the catalog type before cast to CatalogTable
? For example, it would be a CatalogView
when get table from catalog manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getTableSink(identifier)
already has this check, I will change the return type of getTableSink
and get CatalogTable
directly.
...table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
Outdated
Show resolved
Hide resolved
// translate the Table into a DataSet and provide the type that the TableSink expects. | ||
val result: DataSet[T] = translate(table)(outputType) | ||
// Give the DataSet to the TableSink to emit it. | ||
batchSink.emitDataSet(shuffleByPartitionFieldsIfNeeded(batchSink, result)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why deleting these logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first, The former logic is to add shuffle no matter whether it is a static partition or not.
second, after we remove getPartitionFields
from PartitionableTableSink
, here, we can not get the partitions fields. At present, I don't want to optimize dynamic partition shuffle on legacy planner again.
case partitionableSink: PartitionableTableSink | ||
if partitionableSink.getPartitionFieldNames != null | ||
&& partitionableSink.getPartitionFieldNames.nonEmpty => | ||
case partitionableSink: PartitionableTableSink => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment on BatchExecSink
, should we just put the validation here? (For partitioned table, we need to make sure the sink is PartitionableTableSink
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have validateSink above, no need to validate here.
...org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
Outdated
Show resolved
Hide resolved
…support in legacy planner
…le to test partition source and sink
Rebased resolve conflict and fix comments. |
e5c0586
to
4c59ed7
Compare
...ble-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two commit messages should be adjusted. Not get partition keys from catalog manager, but from catalog table.
|
||
private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = { | ||
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) match { | ||
private def getTableSink(identifier: ObjectIdentifier): Option[(CatalogTable, TableSink[_])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identifier -> tableIdentifier
val tableSourceTable = table.unwrap(classOf[TableSourceTable[_]]) | ||
|
||
if (!tableSourceTable.tableSource.isInstanceOf[PartitionableTableSource]) { | ||
throw new TableException(s"Table(${table.getQualifiedName}) with partition keys" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It the table is partitioned, but we have a non PartitionableTableSource, couldn't we just skip partition prune and read the whole data instead?
Throwing an exception doesn't seem to be right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I don't think it's possible to use it like this, it can.
} | ||
} | ||
case _ => throw new TableException( | ||
s"Table(${sinkNode.sinkName}) with partition keys should be a PartitionableTableSink.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need PartitionableTableSink to write data to partitioned table: $tableName
sinkNode.sink match { | ||
case partitionSink: PartitionableTableSink => | ||
val partKeys = sinkNode.catalogTable.getPartitionKeys | ||
if (!partKeys.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can assert part keys are non empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isPartitioned
already judge it, I should remove this judge.
if (!partKeys.isEmpty) { | ||
val partitionIndices = | ||
partKeys.map(partitionSink.getTableSchema.getFieldNames.indexOf(_)) | ||
// validate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't have to validate again? we can move all validation logic to TableSinkUtils:validate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partKeys
and TableSchema
are come from catalog manager, they already validate by catalog manager, I think I can remove it. (Not come from sink or source)
requiredTraitSet = requiredTraitSet.plus( | ||
FlinkRelDistribution.hash(partitionIndices | ||
.map(Integer.valueOf), requireStrict = false)) | ||
requiredTraitSet = requiredTraitSet.plus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this into if (partitionSink.configurePartitionGrouping(true)) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, hash shuffle will add anyway. configurePartitionGrouping
only control sort.
In streaming mode, will add hash shuffle, but not sort for configurePartitionGrouping
.
s"${partitionFields.get(idx)} must be in the schema.") | ||
} | ||
} | ||
if (sinkNode.catalogTable != null && sinkNode.catalogTable.isPartitioned) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as BatchExecSinkRule
val sinkProperties = catalogTable.toProperties | ||
Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) | ||
val sinkProperties = table.toProperties | ||
Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what's the difference between TableFactoryUtil.createTableSinkForCatalogTable
and TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Catalog.getTableFactory
.
Option 1 use Catalog.getTableFactory
to get TableFactory
and create sink.
Option 2 use TableFactoryService
to create TableFactory
and create sink.
The reason why we need option 1 is that hive table factory can not find by TableFactoryService
, but I think this can be improved in future.
…ble for PartitionableTableSink
4c59ed7
to
ca20ef1
Compare
Thanks for your review, updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some final comments, the old logic are really a mess.
|
||
val partitionFieldNames = tableSourceTable.catalogTable.getPartitionKeys.toSeq.toArray[String] | ||
|
||
if (!partitionFieldNames.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this, and it's a really big if
statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this one.
if partitionableSink.getPartitionFieldNames != null | ||
&& partitionableSink.getPartitionFieldNames.nonEmpty => | ||
case partitionableSink: PartitionableTableSink => | ||
partitionableSink.setStaticPartition(catalogSink.getStaticPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering why we set the static partition information here but not during StreamExecSinkRule
or BatchExecSinkRule
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just for the test in PartitionableSinkITCase.testInsertWithStaticPartitions
, it get staticPartitions
from the origin Sink
, if in ExecSinkRule
, it will be another copied sink.
I think we can modify this in #9796
…ble for PartitionableTableSource
…artitionableTableSource and PartitionableTableSink
ca20ef1
to
6a82d9f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM now, +1
What is the purpose of the change
Now PartitionableTableSource and PartitionableTableSink have "getPartitionFieldNames" method, this should be removed, and planner rules should get it from CatalogManager.
The partition field names are the information of Table, source/sink should only be fed with such information but not get them out of it.
Brief change log
See commits.
Verifying this change
This change is already covered by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation