diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java index 19bd3f12de851..a88bea5a5c839 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.function.BinaryOperator; /** * Column statistics. @@ -217,6 +219,92 @@ public ColumnStats copy() { } } + /** + * Merges two column stats. + * When the stats are unknown, whatever the other are, we need return unknown stats. + * The unknown definition for column stats is null. + * + * @param other The other column stats to merge. + * @return The merged column stats. + */ + public ColumnStats merge(ColumnStats other) { + Long ndv = combineIfNonNull(Long::sum, this.ndv, other.ndv); + Long nullCount = combineIfNonNull(Long::sum, this.nullCount, other.nullCount); + Double avgLen = combineIfNonNull((a1, a2) -> (a1 + a2) / 2, this.avgLen, other.avgLen); + Integer maxLen = combineIfNonNull(Math::max, this.maxLen, other.maxLen); + + Number maxValue = combineIfNonNull( + (n1, n2) -> n1.doubleValue() > n2.doubleValue() ? n1 : n2, + this.maxValue, + other.maxValue); + Number minValue = combineIfNonNull( + (n1, n2) -> n1.doubleValue() < n2.doubleValue() ? n1 : n2, + this.minValue, + other.minValue); + + @SuppressWarnings("unchecked") + Comparable max = combineIfNonNull( + (c1, c2) -> ((Comparable) c1).compareTo(c2) > 0 ? c1 : c2, + this.max, + other.max); + @SuppressWarnings("unchecked") + Comparable min = combineIfNonNull( + (c1, c2) -> ((Comparable) c1).compareTo(c2) < 0 ? c1 : c2, + this.min, + other.min); + + if (max != null || min != null) { + return new ColumnStats( + ndv, + nullCount, + avgLen, + maxLen, + max, + min + ); + } else { + return new ColumnStats( + ndv, + nullCount, + avgLen, + maxLen, + maxValue, + minValue + ); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnStats that = (ColumnStats) o; + return Objects.equals(ndv, that.ndv) && + Objects.equals(nullCount, that.nullCount) && + Objects.equals(avgLen, that.avgLen) && + Objects.equals(maxLen, that.maxLen) && + Objects.equals(maxValue, that.maxValue) && + Objects.equals(max, that.max) && + Objects.equals(minValue, that.minValue) && + Objects.equals(min, that.min); + } + + @Override + public int hashCode() { + return Objects.hash(ndv, nullCount, avgLen, maxLen, maxValue, max, minValue, min); + } + + private static T combineIfNonNull(BinaryOperator op, T t1, T t2) { + if (t1 == null || t2 == null) { + return null; + } + return op.apply(t1, t2); + } + /** * ColumnStats builder. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java index 453593ac65847..1c7904da83da3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java @@ -20,14 +20,23 @@ import org.apache.flink.annotation.PublicEvolving; +import javax.annotation.Nonnull; + import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Table statistics. */ @PublicEvolving public final class TableStats { + + /** + * Unknown definition for table stats: + * Unknown {@link #rowCount} is -1. + * Unknown {@link #colStats} is not exist in map. + */ public static final TableStats UNKNOWN = new TableStats(-1, new HashMap<>()); /** @@ -69,4 +78,54 @@ public TableStats copy() { return copy; } + /** + * Merges two table stats. + * When the stats are unknown, whatever the other are, we need return unknown stats. + * See {@link #UNKNOWN}. + * + * @param other The other table stats to merge. + * @return The merged table stats. + */ + @Nonnull + public TableStats merge(TableStats other) { + Map colStats = new HashMap<>(); + for (Map.Entry entry : this.colStats.entrySet()) { + String col = entry.getKey(); + ColumnStats stats = entry.getValue(); + ColumnStats otherStats = other.colStats.get(col); + if (otherStats != null) { + colStats.put(col, stats.merge(otherStats)); + } + } + return new TableStats( + this.rowCount >= 0 && other.rowCount >= 0 ? + this.rowCount + other.rowCount : UNKNOWN.rowCount, + colStats); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableStats that = (TableStats) o; + return rowCount == that.rowCount && + Objects.equals(colStats, that.colStats); + } + + @Override + public int hashCode() { + return Objects.hash(rowCount, colStats); + } + + @Override + public String toString() { + return "TableStats{" + + "rowCount=" + rowCount + + ", colStats=" + colStats + + '}'; + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/plan/stats/TableStatsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/plan/stats/TableStatsTest.java new file mode 100644 index 0000000000000..c92d926b63d0b --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/plan/stats/TableStatsTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Test for {@link TableStats}. + */ +public class TableStatsTest { + + @Test + public void testMerge() { + Map colStats1 = new HashMap<>(); + colStats1.put("a", new ColumnStats(4L, 5L, 2D, 3, 15, 2)); + TableStats stats1 = new TableStats(30, colStats1); + + Map colStats2 = new HashMap<>(); + colStats2.put("a", new ColumnStats(3L, 15L, 12D, 23, 35, 6)); + TableStats stats2 = new TableStats(32, colStats2); + + Map colStatsMerge = new HashMap<>(); + colStatsMerge.put("a", new ColumnStats(7L, 20L, 7D, 23, 35, 2)); + Assert.assertEquals(new TableStats(62, colStatsMerge), stats1.merge(stats2)); + } + + @Test + public void testMergeLackColumnStats() { + Map colStats1 = new HashMap<>(); + colStats1.put("a", new ColumnStats(4L, 5L, 2D, 3, 15, 2)); + colStats1.put("b", new ColumnStats(4L, 5L, 2D, 3, 15, 2)); + TableStats stats1 = new TableStats(30, colStats1); + + Map colStats2 = new HashMap<>(); + colStats2.put("a", new ColumnStats(3L, 15L, 12D, 23, 35, 6)); + TableStats stats2 = new TableStats(32, colStats2); + + Map colStatsMerge = new HashMap<>(); + colStatsMerge.put("a", new ColumnStats(7L, 20L, 7D, 23, 35, 2)); + Assert.assertEquals(new TableStats(62, colStatsMerge), stats1.merge(stats2)); + } + + @Test + public void testMergeUnknownRowCount() { + TableStats stats1 = new TableStats(-1, new HashMap<>()); + TableStats stats2 = new TableStats(32, new HashMap<>()); + Assert.assertEquals(new TableStats(-1, new HashMap<>()), stats1.merge(stats2)); + + stats1 = new TableStats(-1, new HashMap<>()); + stats2 = new TableStats(-1, new HashMap<>()); + Assert.assertEquals(new TableStats(-1, new HashMap<>()), stats1.merge(stats2)); + + stats1 = new TableStats(-3, new HashMap<>()); + stats2 = new TableStats(-2, new HashMap<>()); + Assert.assertEquals(new TableStats(-1, new HashMap<>()), stats1.merge(stats2)); + } + + @Test + public void testMergeColumnStatsUnknown() { + ColumnStats columnStats0 = new ColumnStats(4L, 5L, 2D, 3, 15, 2); + ColumnStats columnStats1 = new ColumnStats(4L, null, 2D, 3, 15, 2); + ColumnStats columnStats2 = new ColumnStats(4L, 5L, 2D, null, 15, 2); + ColumnStats columnStats3 = new ColumnStats(null, 5L, 2D, 3, 15, 2); + ColumnStats columnStats4 = new ColumnStats(4L, 5L, 2D, 3, null, 2); + ColumnStats columnStats5 = new ColumnStats(4L, 5L, 2D, 3, 15, null); + ColumnStats columnStats6 = new ColumnStats(4L, 5L, null, 3, 15, 2); + + Assert.assertEquals(new ColumnStats(8L, null, 2D, 3, 15, 2), columnStats0.merge(columnStats1)); + Assert.assertEquals(new ColumnStats(8L, 10L, 2D, null, 15, 2), columnStats0.merge(columnStats2)); + Assert.assertEquals(new ColumnStats(null, 10L, 2D, 3, 15, 2), columnStats0.merge(columnStats3)); + Assert.assertEquals(new ColumnStats(8L, 10L, 2D, 3, null, 2), columnStats0.merge(columnStats4)); + Assert.assertEquals(new ColumnStats(8L, 10L, 2D, 3, 15, null), columnStats0.merge(columnStats5)); + Assert.assertEquals(new ColumnStats(8L, 10L, null, 3, 15, 2), columnStats0.merge(columnStats6)); + Assert.assertEquals(new ColumnStats(8L, 10L, null, 3, 15, 2), columnStats6.merge(columnStats6)); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java index f906440195770..7552714cf8c99 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.ConnectorCatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.planner.catalog.CatalogSchemaTable; import org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable; @@ -109,8 +110,8 @@ private static FlinkPreparingTableBase toPreparingTable( ConnectorCatalogTable connectorTable = (ConnectorCatalogTable) baseTable; if ((connectorTable).getTableSource().isPresent()) { return convertSourceTable(relOptSchema, - names, rowType, + schemaTable.getTableIdentifier(), connectorTable, schemaTable.getStatistic(), schemaTable.isStreamingMode()); @@ -155,8 +156,8 @@ private static FlinkPreparingTableBase convertCatalogView( private static FlinkPreparingTableBase convertSourceTable( RelOptSchema relOptSchema, - List names, RelDataType rowType, + ObjectIdentifier tableIdentifier, ConnectorCatalogTable table, FlinkStatistic statistic, boolean isStreamingMode) { @@ -173,7 +174,7 @@ private static FlinkPreparingTableBase convertSourceTable( return new TableSourceTable<>( relOptSchema, - names, + tableIdentifier, rowType, statistic, tableSource, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index e37d152f14f30..6b2d6f37896c1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -22,9 +22,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionDefaultVisitor; @@ -54,6 +56,7 @@ import org.apache.flink.table.operations.WindowAggregateQueryOperation; import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow; import org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor; +import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute; @@ -349,21 +352,19 @@ public RelNode visit(TableSourceQueryOperation tableSourceOperation) { } FlinkStatistic statistic; - List names; + ObjectIdentifier tableIdentifier; if (tableSourceOperation instanceof RichTableSourceQueryOperation && ((RichTableSourceQueryOperation) tableSourceOperation).getIdentifier() != null) { - ObjectIdentifier identifier = ((RichTableSourceQueryOperation) tableSourceOperation).getIdentifier(); + tableIdentifier = ((RichTableSourceQueryOperation) tableSourceOperation).getIdentifier(); statistic = ((RichTableSourceQueryOperation) tableSourceOperation).getStatistic(); - names = Arrays.asList( - identifier.getCatalogName(), - identifier.getDatabaseName(), - identifier.getObjectName()); } else { statistic = FlinkStatistic.UNKNOWN(); // TableSourceScan requires a unique name of a Table for computing a digest. // We are using the identity hash of the TableSource object. String refId = "Unregistered_TableSource_" + System.identityHashCode(tableSource); - names = Collections.singletonList(refId); + CatalogManager catalogManager = relBuilder.getCluster().getPlanner().getContext() + .unwrap(FlinkContext.class).getCatalogManager(); + tableIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(refId)); } RelDataType rowType = TableSourceUtil.getSourceRowType(relBuilder.getTypeFactory(), @@ -372,7 +373,7 @@ public RelNode visit(TableSourceQueryOperation tableSourceOperation) { !isBatch); TableSourceTable tableSourceTable = new TableSourceTable<>( relBuilder.getRelOptSchema(), - names, + tableIdentifier, rowType, statistic, tableSource, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala index 0011617e3f859..f06bb9c6b3eaa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala @@ -19,10 +19,15 @@ package org.apache.flink.table.planner.plan.rules.logical import org.apache.flink.table.api.TableException +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException +import org.apache.flink.table.catalog.{Catalog, CatalogPartitionSpec, ObjectIdentifier} +import org.apache.flink.table.plan.stats.TableStats import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory} -import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, TableSourceTable} +import org.apache.flink.table.planner.plan.schema.TableSourceTable import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, PartitionPruner, RexNodeExtractor} +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala import org.apache.flink.table.sources.PartitionableTableSource import org.apache.calcite.plan.RelOptRule.{none, operand} @@ -31,6 +36,8 @@ import org.apache.calcite.rel.core.Filter import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle} +import java.util + import scala.collection.JavaConversions._ /** @@ -60,12 +67,18 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule( override def onMatch(call: RelOptRuleCall): Unit = { val filter: Filter = call.rel(0) val scan: LogicalTableScan = call.rel(1) + val context = call.getPlanner.getContext.unwrap(classOf[FlinkContext]) + val config = context.getTableConfig val tableSourceTable: TableSourceTable[_] = scan.getTable.unwrap(classOf[TableSourceTable[_]]) + val tableIdentifier = tableSourceTable.tableIdentifier + val catalogOption = toScala(context.getCatalogManager.getCatalog( + tableIdentifier.getCatalogName)) val partitionFieldNames = tableSourceTable.catalogTable.getPartitionKeys.toSeq.toArray[String] val tableSource = tableSourceTable.tableSource.asInstanceOf[PartitionableTableSource] val inputFieldType = filter.getInput.getRowType + val inputFields = inputFieldType.getFieldNames.toList.toArray val relBuilder = call.builder() val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan) @@ -73,7 +86,7 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule( RexNodeExtractor.extractPartitionPredicates( filter.getCondition, maxCnfNodeCount, - inputFieldType.getFieldNames.toList.toArray, + inputFields, relBuilder.getRexBuilder, partitionFieldNames ) @@ -96,7 +109,7 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule( val allPartitions = tableSource.getPartitions val remainingPartitions = PartitionPruner.prunePartitions( - call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig, + config, partitionFieldNames, partitionFieldTypes, allPartitions, @@ -112,14 +125,28 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule( } val statistic = tableSourceTable.getStatistic - val newStatistic = if (remainingPartitions.size() == allPartitions.size()) { - // Keep all Statistics if no predicates can be pushed down - statistic - } else if (statistic == FlinkStatistic.UNKNOWN) { - statistic - } else { - // Remove tableStats after predicates pushed down - FlinkStatistic.builder().statistic(statistic).tableStats(null).build() + val newStatistic = { + val tableStats = catalogOption match { + case Some(catalog) => + def mergePartitionStats(): TableStats = { + var stats: TableStats = null + for (p <- remainingPartitions) { + getPartitionStats(catalog, tableIdentifier, p) match { + case Some(currStats) => + if (stats == null) { + stats = currStats + } else { + stats = stats.merge(currStats) + } + case None => return null + } + } + stats + } + mergePartitionStats() + case None => null + } + FlinkStatistic.builder().statistic(statistic).tableStats(tableStats).build() } val newTableSourceTable = tableSourceTable.copy(newTableSource, newStatistic) @@ -133,6 +160,21 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule( } } + private def getPartitionStats( + catalog: Catalog, + objectIdentifier: ObjectIdentifier, + partSpec: util.Map[String, String]): Option[TableStats] = { + val tablePath = objectIdentifier.toObjectPath + val spec = new CatalogPartitionSpec(new util.LinkedHashMap[String, String](partSpec)) + try { + val tableStatistics = catalog.getPartitionStatistics(tablePath, spec) + val columnStatistics = catalog.getPartitionColumnStatistics(tablePath, spec) + Some(CatalogTableStatisticsConverter.convertToTableStats(tableStatistics, columnStatistics)) + } catch { + case _: PartitionNotExistException => None + } + } + /** * adjust the partition field reference index to evaluate the partition values. * e.g. the original input fields is: a, b, c, p, and p is partition field. the partition values diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala index 87bf1ae55117a..db3c6767ebb5e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.schema import org.apache.flink.table.api.TableException -import org.apache.flink.table.catalog.CatalogTable +import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier} import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory} import org.apache.flink.table.sources.{StreamTableSource, TableSource} @@ -70,7 +70,7 @@ class CatalogSourceTable[T]( val cluster = context.getCluster val tableSourceTable = new TableSourceTable[T]( relOptSchema, - names, + schemaTable.getTableIdentifier, rowType, statistic, tableSource, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala index f2c224c0f44c8..24e52aee3803d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.schema -import org.apache.flink.table.catalog.CatalogTable +import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier} import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{TableSource, TableSourceValidation} @@ -28,6 +28,7 @@ import org.apache.flink.table.api.{TableException, WatermarkSpec} import org.apache.calcite.plan.{RelOptSchema, RelOptTable} +import java.util import java.util.{List => JList} import scala.collection.JavaConverters._ @@ -39,19 +40,27 @@ import scala.collection.JavaConverters._ * *

It also defines the [[copy]] method used for push down rules. * + * @param tableIdentifier full path of the table to retrieve. * @param tableSource The [[TableSource]] for which is converted to a Calcite Table * @param isStreamingMode A flag that tells if the current table is in stream mode * @param catalogTable Catalog table where this table source table comes from */ class TableSourceTable[T]( relOptSchema: RelOptSchema, - names: JList[String], + val tableIdentifier: ObjectIdentifier, rowType: RelDataType, statistic: FlinkStatistic, val tableSource: TableSource[T], val isStreamingMode: Boolean, val catalogTable: CatalogTable) - extends FlinkPreparingTableBase(relOptSchema, rowType, names, statistic) { + extends FlinkPreparingTableBase( + relOptSchema, + rowType, + util.Arrays.asList( + tableIdentifier.getCatalogName, + tableIdentifier.getDatabaseName, + tableIdentifier.getObjectName), + statistic) { Preconditions.checkNotNull(tableSource) Preconditions.checkNotNull(statistic) @@ -79,8 +88,14 @@ class TableSourceTable[T]( * @return New TableSourceTable instance with specified table source and [[FlinkStatistic]] */ def copy(tableSource: TableSource[_], statistic: FlinkStatistic): TableSourceTable[T] = { - new TableSourceTable[T](relOptSchema, names, rowType, statistic, - tableSource.asInstanceOf[TableSource[T]], isStreamingMode, catalogTable) + new TableSourceTable[T]( + relOptSchema, + tableIdentifier, + rowType, + statistic, + tableSource.asInstanceOf[TableSource[T]], + isStreamingMode, + catalogTable) } /** @@ -100,7 +115,13 @@ class TableSourceTable[T]( .map(idx => rowType.getFieldList.get(idx)) .toList .asJava) - new TableSourceTable[T](relOptSchema, names, newRowType, statistic, - tableSource.asInstanceOf[TableSource[T]], isStreamingMode, catalogTable) + new TableSourceTable[T]( + relOptSchema, + tableIdentifier, + newRowType, + statistic, + tableSource.asInstanceOf[TableSource[T]], + isStreamingMode, + catalogTable) } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java index b60b0cdf63be0..c6dacac8699bc 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java @@ -22,7 +22,10 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -38,9 +41,12 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.catalog.stats.Date; import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.stats.ValueInterval$; import org.apache.flink.table.planner.utils.TableTestUtil; +import org.apache.flink.table.planner.utils.TestPartitionableSourceFactory; import org.apache.flink.table.planner.utils.TestTableSource; import org.apache.flink.table.types.DataType; @@ -52,6 +58,7 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -140,6 +147,154 @@ private void alterTableStatistics( catalog.alterTableColumnStatistics(new ObjectPath(databaseName, tableName), createColumnStats(), true); } + @Test + public void testGetPartitionStatsFromCatalog() throws Exception { + TestPartitionableSourceFactory.registerTableSource(tEnv, "PartT", true); + createPartitionStats("A", 1); + createPartitionColumnStats("A", 1); + createPartitionStats("A", 2); + createPartitionColumnStats("A", 2); + + RelNode t1 = ((PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner()).optimize( + TableTestUtil.toRelNode(tEnv.sqlQuery("select id, name from PartT where part1 = 'A'"))); + FlinkRelMetadataQuery mq = FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + assertEquals(200.0, mq.getRowCount(t1), 0.0); + assertEquals(Arrays.asList(8.0, 43.5), mq.getAverageColumnSizes(t1)); + + // long type + assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0); + assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0); + assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0)); + + // string type + assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0); + assertEquals(0.0, mq.getColumnNullCount(t1, 1), 0.0); + assertNull(mq.getColumnInterval(t1, 1)); + } + + @Test + public void testGetPartitionStatsWithUnknownRowCount() throws Exception { + TestPartitionableSourceFactory.registerTableSource(tEnv, "PartT", true); + createPartitionStats("A", 1, TableStats.UNKNOWN.getRowCount()); + createPartitionColumnStats("A", 1); + createPartitionStats("A", 2); + createPartitionColumnStats("A", 2); + + RelNode t1 = ((PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner()).optimize( + TableTestUtil.toRelNode(tEnv.sqlQuery("select id, name from PartT where part1 = 'A'"))); + FlinkRelMetadataQuery mq = FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + assertEquals(100_000_000, mq.getRowCount(t1), 0.0); + assertEquals(Arrays.asList(8.0, 43.5), mq.getAverageColumnSizes(t1)); + + // long type + assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0); + assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0); + assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0)); + + // string type + assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0); + assertEquals(0.0, mq.getColumnNullCount(t1, 1), 0.0); + assertNull(mq.getColumnInterval(t1, 1)); + } + + @Test + public void testGetPartitionStatsWithUnknownColumnStats() throws Exception { + TestPartitionableSourceFactory.registerTableSource(tEnv, "PartT", true); + createPartitionStats("A", 1); + createPartitionStats("A", 2); + createPartitionColumnStats("A", 2); + + RelNode t1 = ((PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner()).optimize( + TableTestUtil.toRelNode(tEnv.sqlQuery("select id, name from PartT where part1 = 'A'"))); + FlinkRelMetadataQuery mq = FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + assertEquals(200.0, mq.getRowCount(t1), 0.0); + + // long type + assertNull(mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null)); + assertNull(mq.getColumnNullCount(t1, 0)); + assertNull(mq.getColumnInterval(t1, 0)); + + // string type + assertNull(mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null)); + assertNull(mq.getColumnNullCount(t1, 1)); + } + + @Test + public void testGetPartitionStatsWithSomeUnknownColumnStats() throws Exception { + TestPartitionableSourceFactory.registerTableSource(tEnv, "PartT", true); + createPartitionStats("A", 1); + createPartitionColumnStats("A", 1, true); + createPartitionStats("A", 2); + createPartitionColumnStats("A", 2); + + RelNode t1 = ((PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner()).optimize( + TableTestUtil.toRelNode(tEnv.sqlQuery("select id, name from PartT where part1 = 'A'"))); + FlinkRelMetadataQuery mq = FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery()); + assertEquals(200.0, mq.getRowCount(t1), 0.0); + + // long type + assertNull(mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null)); + assertNull(mq.getColumnNullCount(t1, 0)); + assertNull(mq.getColumnInterval(t1, 0)); + + // string type + assertNull(mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null)); + assertNull(mq.getColumnNullCount(t1, 1)); + } + + private void createPartitionStats(String part1, int part2) throws Exception { + createPartitionStats(part1, part2, 100); + } + + private void createPartitionStats( + String part1, int part2, long rowCount) throws Exception { + ObjectPath path = ObjectPath.fromString("default_database.PartT"); + + LinkedHashMap partSpecMap = new LinkedHashMap<>(); + partSpecMap.put("part1", part1); + partSpecMap.put("part2", String.valueOf(part2)); + CatalogPartitionSpec partSpec = new CatalogPartitionSpec(partSpecMap); + catalog.createPartition( + path, + partSpec, + new CatalogPartitionImpl(new HashMap<>(), ""), + true); + catalog.alterPartitionStatistics( + path, + partSpec, + new CatalogTableStatistics(rowCount, 10, 1000L, 2000L), + true); + } + + private void createPartitionColumnStats(String part1, int part2) throws Exception { + createPartitionColumnStats(part1, part2, false); + } + + private void createPartitionColumnStats(String part1, int part2, boolean unknown) throws Exception { + ObjectPath path = ObjectPath.fromString("default_database.PartT"); + LinkedHashMap partSpecMap = new LinkedHashMap<>(); + partSpecMap.put("part1", part1); + partSpecMap.put("part2", String.valueOf(part2)); + CatalogPartitionSpec partSpec = new CatalogPartitionSpec(partSpecMap); + + CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong( + -123L, 763322L, 23L, 77L); + CatalogColumnStatisticsDataString stringColStats = new CatalogColumnStatisticsDataString( + 152L, 43.5D, 20L, 0L); + Map colStatsMap = new HashMap<>(); + colStatsMap.put("id", unknown ? + new CatalogColumnStatisticsDataLong(null, null, null, null) : + longColStats); + colStatsMap.put("name", unknown ? + new CatalogColumnStatisticsDataString(null, null, null, null) : + stringColStats); + catalog.alterPartitionColumnStatistics( + path, + partSpec, + new CatalogColumnStatistics(colStatsMap), + true); + } + private CatalogColumnStatistics createColumnStats() { CatalogColumnStatisticsDataBoolean booleanColStats = new CatalogColumnStatisticsDataBoolean(55L, 45L, 5L); CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 77L); diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala index d69bb2c271dee..b596ad61822ea 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala @@ -805,6 +805,16 @@ object TestPartitionableSourceFactory { .field("part2", DataTypes.INT()) .build() + /** + * For java invoking. + */ + def registerTableSource( + tEnv: TableEnvironment, + tableName: String, + isBounded: Boolean): Unit = { + registerTableSource(tEnv, tableName, isBounded, tableSchema = tableSchema) + } + def registerTableSource( tEnv: TableEnvironment, tableName: String,