Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ private String toStagingDir(String finalDir, Configuration conf) throws IOExcept
return res;
}

@Override
public List<String> getPartitionFieldNames() {
private List<String> getPartitionFieldNames() {
return catalogTable.getPartitionKeys();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ public List<Map<String, String>> getPartitions() {
return partitionList;
}

@Override
public List<String> getPartitionFieldNames() {
return catalogTable.getPartitionKeys();
}

@Override
public TableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions) {
if (catalogTable.getPartitionKeys() == null || catalogTable.getPartitionKeys().size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@

import org.apache.flink.annotation.Experimental;

import java.util.List;
import java.util.Map;

/**
* An interface for partitionable {@link TableSink}. A partitionable sink can writes
* query results to partitions.
*
* <p>Partition columns are defined via {@link #getPartitionFieldNames()} and the field names
* should be sorted in a strict order. And all the partition fields should exist in the
* {@link TableSink#getTableSchema()}.
* <p>Partition columns are defined via catalog table.
*
* <p>For example, a partitioned table named {@code my_table} with a table schema
* {@code [a INT, b VARCHAR, c DOUBLE, dt VARCHAR, country VARCHAR]} is partitioned on columns
Expand Down Expand Up @@ -62,17 +59,6 @@
@Experimental
public interface PartitionableTableSink {

/**
* Gets the partition field names of the table. The partition field names should be sorted in
* a strict order, i.e. they have the order as specified in the PARTITION statement in DDL.
* This should be an empty set if the table is not partitioned.
*
* <p>All the partition fields should exist in the {@link TableSink#getTableSchema()}.
*
* @return partition field names of the table, empty if the table is not partitioned.
*/
List<String> getPartitionFieldNames();

/**
* Sets the static partition into the {@link TableSink}. The static partition may be partial
* of all partition columns. See the class Javadoc for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.sources;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.table.sinks.TableSink;

import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +32,7 @@
*
* <p>A partition is represented as a {@code Map<String, String>} which maps from partition
* field name to partition value. Since the map is NOT ordered, the correct order of partition
* fields should be obtained via {@link #getPartitionFieldNames()}.
* fields should be obtained via partition keys of catalog table.
*/
@Experimental
public interface PartitionableTableSource {
Expand All @@ -43,17 +42,6 @@ public interface PartitionableTableSource {
*/
List<Map<String, String>> getPartitions();

/**
* Gets the partition field names of the table. The partition field names should be sorted in
* a strict order, i.e. they have the order as specified in the PARTITION statement in DDL.
* This should be an empty set if the table is not partitioned.
*
* <p>All the partition fields should exist in the {@link TableSink#getTableSchema()}.
*
* @return partition field names of the table, empty if the table is not partitioned.
*/
List<String> getPartitionFieldNames();

/**
* Applies the remaining partitions to the table source. The {@code remainingPartitions} is
* the remaining partitions of {@link #getPartitions()} after partition pruning applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ private Table convertConnectorTable(
return new TableSourceTable<>(
tableSource,
isStreamingMode,
FlinkStatistic.builder().tableStats(tableStats).build());
FlinkStatistic.builder().tableStats(tableStats).build(),
null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we also have CatalogTable for such TableSource wrapped table in the future? If not, I would suggest to change CatalogTable to Opion[CatalogTable] in TableSourceTable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have some concern to put the whole CatalogTable in TableSourceTable. Because in CatalogTable, it may contains all the computed columns defined in DDL. But not all of them are retained in source, some of them may be applied to the following LogicalProject. Could we only put the information we need in TabeSourceTable and TableSinkTable? e.g. List<String> partitionKeys.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say let's use the information from CatalogTable more carefully then... I can imagine partition keys are just a starter, we might need more and more information from CatalogTable in the future. CatalogTable is some kind of meta information about the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @KurtYoung , After temporary table support, every table should have CatalogTable. I think we can keep not Option.
Hi @wuchong , I agree with kurt, Actually, source may also need to know compute columns. It needs to know which fields it does not read. (except compute columns expressions)

Copy link
Member

@wuchong wuchong Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source may also need to know compute columns. It needs to know which fields it does not read. (except compute columns expressions)

Actually, we will retain some expression in source (for rowtime generation), then we have to have another field in TableSourceTable to keep this information. say generatedExpressions, this might be confused with the expressions in CatalogTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Jark, I don't quite understand the expression in this source, but I think we need a good name to clarify it. It's may not just the TableSourceTable that saves them together? I think they are all attributes of table.

} else {
Optional<TableSinkTable> tableSinkTable = table.getTableSink()
.map(tableSink -> new TableSinkTable<>(
Expand Down Expand Up @@ -180,7 +181,8 @@ private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
return new TableSourceTable<>(
tableSource,
!((StreamTableSource<?>) tableSource).isBounded(),
FlinkStatistic.UNKNOWN()
FlinkStatistic.UNKNOWN(),
table
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ public <U> RelNode visit(TableSourceQueryOperation<U> tableSourceOperation) {
names = Collections.singletonList(refId);
}

TableSourceTable<?> tableSourceTable = new TableSourceTable<>(tableSource, !isBatch, statistic);
TableSourceTable<?> tableSourceTable = new TableSourceTable<>(
tableSource, !isBatch, statistic, null);
FlinkRelOptTable table = FlinkRelOptTable.create(
relBuilder.getRelOptSchema(),
tableSourceTable.getRowType(relBuilder.getTypeFactory()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
sink.getTraitSet,
newInput,
sink.sink,
sink.sinkName)
sink.sinkName,
sink.catalogTable)

case _ =>
throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,10 @@ abstract class PlannerBase(
case catalogSink: CatalogSinkModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
val identifier = catalogManager.qualifyIdentifier(catalogSink.getTablePath: _*)
getTableSink(identifier).map(sink => {
TableSinkUtils.validateSink(catalogSink, identifier, sink)
getTableSink(identifier).map { case (table, sink) =>
TableSinkUtils.validateSink(catalogSink, identifier, sink, table.getPartitionKeys)
sink match {
case partitionableSink: PartitionableTableSink
if partitionableSink.getPartitionFieldNames != null
&& partitionableSink.getPartitionFieldNames.nonEmpty =>
case partitionableSink: PartitionableTableSink =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment on BatchExecSink, should we just put the validation here? (For partitioned table, we need to make sure the sink is PartitionableTableSink)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have validateSink above, no need to validate here.

partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we set the static partition information here but not during StreamExecSinkRule or BatchExecSinkRule.

Copy link
Contributor Author

@JingsongLi JingsongLi Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just for the test in PartitionableSinkITCase.testInsertWithStaticPartitions, it get staticPartitions from the origin Sink, if in ExecSinkRule, it will be another copied sink.
I think we can modify this in #9796

case _ =>
}
Expand All @@ -192,8 +190,8 @@ abstract class PlannerBase(
s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
sink.getClass.getName)
}
LogicalSink.create(input, sink, catalogSink.getTablePath.mkString("."))
}) match {
LogicalSink.create(input, sink, catalogSink.getTablePath.mkString("."), table)
} match {
case Some(sinkRel) => sinkRel
case None => throw new TableException(s"Sink ${catalogSink.getTablePath} does not exists")
}
Expand Down Expand Up @@ -254,28 +252,31 @@ abstract class PlannerBase(
*/
protected def translateToPlan(execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]]

private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) match {
private def getTableSink(
tableIdentifier: ObjectIdentifier): Option[(CatalogTable, TableSink[_])] = {
JavaScalaConversionUtil.toScala(catalogManager.getTable(tableIdentifier)) match {
case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
JavaScalaConversionUtil
.toScala(s.asInstanceOf[ConnectorCatalogTable[_, _]].getTableSink)
val table = s.asInstanceOf[ConnectorCatalogTable[_, _]]
JavaScalaConversionUtil.toScala(table.getTableSink) match {
case Some(sink) => Some(table, sink)
case None => None
}

case Some(s) if s.isInstanceOf[CatalogTable] =>

val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
val catalogTable = s.asInstanceOf[CatalogTable]
val catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName)
val table = s.asInstanceOf[CatalogTable]
if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
val objectPath = objectIdentifier.toObjectPath
val objectPath = tableIdentifier.toObjectPath
val sink = TableFactoryUtil.createTableSinkForCatalogTable(
catalog.get(),
catalogTable,
table,
objectPath)
if (sink.isPresent) {
return Option(sink.get())
return Option(table, sink.get())
}
}
val sinkProperties = catalogTable.toProperties
Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
val sinkProperties = table.toProperties
Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what's the difference between TableFactoryUtil.createTableSinkForCatalogTable and TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See Catalog.getTableFactory.
Option 1 use Catalog.getTableFactory to get TableFactory and create sink.
Option 2 use TableFactoryService to create TableFactory and create sink.
The reason why we need option 1 is that hive table factory can not find by TableFactoryService, but I think this can be improved in future.

.createTableSink(sinkProperties))

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.calcite

import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.sinks.TableSink

import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
Expand All @@ -37,11 +38,13 @@ final class LogicalSink(
traitSet: RelTraitSet,
input: RelNode,
sink: TableSink[_],
sinkName: String)
sinkName: String,
val catalogTable: CatalogTable)
extends Sink(cluster, traitSet, input, sink, sinkName) {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new LogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
new LogicalSink(
cluster, traitSet, inputs.head, sink, sinkName, catalogTable)
}

}
Expand All @@ -50,8 +53,10 @@ object LogicalSink {

def create(input: RelNode,
sink: TableSink[_],
sinkName: String): LogicalSink = {
sinkName: String,
catalogTable: CatalogTable = null): LogicalSink = {
val traits = input.getCluster.traitSetOf(Convention.NONE)
new LogicalSink(input.getCluster, traits, input, sink, sinkName)
new LogicalSink(
input.getCluster, traits, input, sink, sinkName, catalogTable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalSink, Sink}
import org.apache.flink.table.sinks.TableSink
Expand All @@ -39,12 +40,14 @@ class FlinkLogicalSink(
traitSet: RelTraitSet,
input: RelNode,
sink: TableSink[_],
sinkName: String)
sinkName: String,
val catalogTable: CatalogTable)
extends Sink(cluster, traitSet, input, sink, sinkName)
with FlinkLogicalRel {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
new FlinkLogicalSink(
cluster, traitSet, inputs.head, sink, sinkName, catalogTable)
}

}
Expand All @@ -62,7 +65,8 @@ private class FlinkLogicalSinkConverter
FlinkLogicalSink.create(
newInput,
sink.sink,
sink.sinkName)
sink.sinkName,
sink.catalogTable)
}
}

Expand All @@ -72,9 +76,11 @@ object FlinkLogicalSink {
def create(
input: RelNode,
sink: TableSink[_],
sinkName: String): FlinkLogicalSink = {
sinkName: String,
catalogTable: CatalogTable = null): FlinkLogicalSink = {
val cluster = input.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalSink(cluster, traitSet, input, sink, sinkName)
new FlinkLogicalSink(
cluster, traitSet, input, sink, sinkName, catalogTable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
}
val newTableSourceTable = new TableSourceTable(
newTableSource, tableSourceTable.isStreamingMode, newStatistic)
newTableSource,
tableSourceTable.isStreamingMode,
newStatistic,
tableSourceTable.catalogTable)
relOptTable.copy(newTableSourceTable, tableSourceTable.getRowType(typeFactory))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.flink.table.planner.plan.rules.logical

import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable}
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, PartitionPruner, RexNodeExtractor}
import org.apache.flink.table.sources.PartitionableTableSource

import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.core.Filter
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
import org.apache.flink.table.api.TableException

import scala.collection.JavaConversions._

Expand All @@ -49,11 +50,9 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(

val scan: LogicalTableScan = call.rel(1)
scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
case table: TableSourceTable[_] =>
table.tableSource match {
case p: PartitionableTableSource => p.getPartitionFieldNames.nonEmpty
case _ => false
}
case table: TableSourceTable[_] => table.catalogTable != null &&
table.catalogTable.isPartitioned &&
table.tableSource.isInstanceOf[PartitionableTableSource]
case _ => false
}
}
Expand All @@ -62,18 +61,12 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
val filter: Filter = call.rel(0)
val scan: LogicalTableScan = call.rel(1)
val table: FlinkRelOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
pushPartitionIntoScan(call, filter, scan, table)
}

private def pushPartitionIntoScan(
call: RelOptRuleCall,
filter: Filter,
scan: LogicalTableScan,
relOptTable: FlinkRelOptTable): Unit = {
val tableSourceTable = table.unwrap(classOf[TableSourceTable[_]])

val partitionFieldNames = tableSourceTable.catalogTable.getPartitionKeys.toSeq.toArray[String]

val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val tableSource = tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource]
val partitionFieldNames = tableSource.getPartitionFieldNames.toList.toArray
val inputFieldType = filter.getInput.getRowType

val relBuilder = call.builder()
Expand Down Expand Up @@ -131,8 +124,11 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
}
val newTableSourceTable = new TableSourceTable(
newTableSource, tableSourceTable.isStreamingMode, newStatistic)
val newRelOptTable = relOptTable.copy(newTableSourceTable, relOptTable.getRowType)
newTableSource,
tableSourceTable.isStreamingMode,
newStatistic,
tableSourceTable.catalogTable)
val newRelOptTable = table.copy(newTableSourceTable, table.getRowType)

val newScan = new LogicalTableScan(scan.getCluster, scan.getTraitSet, newRelOptTable)
// check whether framework still need to do a filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
newTableSource,
tableSourceTable.isStreamingMode,
tableSourceTable.statistic,
Option(usedFields))
Option(usedFields),
tableSourceTable.catalogTable)
// row type is changed after project push down
val newRowType = newTableSourceTable.getRowType(scan.getCluster.getTypeFactory)
val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType)
Expand Down
Loading