From 710d073281a9499d82a531bb5b4343fee8b16961 Mon Sep 17 00:00:00 2001 From: Anton Lykov Date: Fri, 24 Apr 2026 19:19:03 +0000 Subject: [PATCH 1/2] [SPARK-XXXXX][SQL] Introduce SupportsReportCatalogStatistics mixin for Table Adds a Table mixin that lets DSv2 connectors expose table-level (pre-filter, pre-pruning) statistics without going through a Scan. DataSourceV2ScanRelation.computeStats prefers the mixin when the table implements it and reports numRows; otherwise it falls through to the existing Scan-based path unchanged. --- .../SupportsReportCatalogStatistics.java | 57 +++++++++++++++++++ .../datasources/v2/DataSourceV2Relation.scala | 16 +++++- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsReportCatalogStatistics.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsReportCatalogStatistics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsReportCatalogStatistics.java new file mode 100644 index 000000000000..43a8af3fedbe --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsReportCatalogStatistics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.Statistics; + +/** + * A mix-in interface for {@link Table}. Data sources can implement this interface to report + * table-level statistics to Spark. + *

+ * The statistics returned here describe the table as a whole, independent of any {@code Scan} + * and independent of any pushed operators (filters, partition pruning, column pruning, + * aggregate pushdown). They are analogous to DSv1's {@code CatalogStatistics} exposed via + * {@code CatalogTable.stats}: a stable, pre-filter, pre-pruning property of the table. + *

+ * This is distinct from + * {@link org.apache.spark.sql.connector.read.SupportsReportStatistics} on {@code Scan}, which + * reports post-pushdown statistics that reflect the data the scan will actually + * produce. A table may implement both: catalog statistics drive CBO decisions on the + * unfiltered relation (join reordering, broadcast thresholds), while scan statistics tighten + * estimates once pushdown has happened. + *

+ * Implementations should return statistics without triggering expensive work such as file + * listing or remote metadata fetches beyond what is already cached on the {@code Table} + * instance. Consumers may call this method during optimization, potentially more than once + * per query, and expect a consistent result for a given {@code Table} instance. + * + * @since 4.2.0 + */ +@Evolving +public interface SupportsReportCatalogStatistics extends Table { + + /** + * Returns table-level statistics for this table. + *

+ * The returned statistics must describe the full table, not a filtered or pruned subset. + * When no statistics are available, return {@link Statistics#emptyStatistics} rather than + * {@code null}. + */ + Statistics catalogStatistics(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 19371dcb94de..e6e1108ab4a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataC import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, SupportsReportCatalogStatistics, Table, TableCapability, TableCatalog, V2TableUtil} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics, SupportsRuntimeV2Filtering} @@ -199,6 +199,20 @@ case class DataSourceV2ScanRelation( } override def computeStats(): Statistics = { + // If the table reports catalog-level (pre-filter, pre-pruning) statistics via + // SupportsReportCatalogStatistics and those stats carry a row count, prefer them: + // they describe the full table independent of any pushdown and are typically the + // strongest stats available to CBO. Otherwise fall through to the scan's own + // post-pushdown statistics as before. + relation.table match { + case t: SupportsReportCatalogStatistics => + val catalogStats = t.catalogStatistics() + if (catalogStats.numRows().isPresent) { + return DataSourceV2Relation.transformV2Stats( + catalogStats, None, conf.defaultSizeInBytes, output) + } + case _ => + } scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() From c74217fd4cb2c411c50104889f7bbf247eb59f55 Mon Sep 17 00:00:00 2001 From: Anton Lykov Date: Fri, 24 Apr 2026 22:19:27 +0200 Subject: [PATCH 2/2] Update DataSourceV2Relation.scala --- .../datasources/v2/DataSourceV2Relation.scala | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index e6e1108ab4a5..19371dcb94de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataC import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, SupportsReportCatalogStatistics, Table, TableCapability, TableCatalog, V2TableUtil} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics, SupportsRuntimeV2Filtering} @@ -199,20 +199,6 @@ case class DataSourceV2ScanRelation( } override def computeStats(): Statistics = { - // If the table reports catalog-level (pre-filter, pre-pruning) statistics via - // SupportsReportCatalogStatistics and those stats carry a row count, prefer them: - // they describe the full table independent of any pushdown and are typically the - // strongest stats available to CBO. Otherwise fall through to the scan's own - // post-pushdown statistics as before. - relation.table match { - case t: SupportsReportCatalogStatistics => - val catalogStats = t.catalogStatistics() - if (catalogStats.numRows().isPresent) { - return DataSourceV2Relation.transformV2Stats( - catalogStats, None, conf.defaultSizeInBytes, output) - } - case _ => - } scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics()