Skip to content

Commit

Permalink
[SPARK-29277][SQL] Add early DSv2 filter and projection pushdown
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This adds a new rule, `V2ScanRelationPushDown`, to push filters and projections in to a new `DataSourceV2ScanRelation` in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan.

To run scan pushdown before rules where stats are used, this adds a new optimizer override, `earlyScanPushDownRules` and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule, `PruneFileSourcePartitions`, is moved into the early pushdown rule set.

This also moves pushdown helper methods from `DataSourceV2Strategy` into a util class.

### Why are the changes needed?

This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

This updates the implementation of stats from `DataSourceV2Relation` so tests will fail if stats are accessed before early pushdown for v2 relations.

Closes #25955 from rdblue/move-v2-pushdown.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Ryan Blue <blue@apache.org>
  • Loading branch information
rdblue committed Oct 31, 2019
1 parent 8207c83 commit cfc80d0
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 162 deletions.
Expand Up @@ -681,10 +681,18 @@ class Analyzer(
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case desc @ DescribeTable(u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => desc.copy(table = rel))
.getOrElse(desc)

case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)

case u: UnresolvedV2Relation =>
CatalogV2Util.loadTable(u.catalog, u.tableName).map { table =>
DataSourceV2Relation.create(table)
}.getOrElse(u)
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}
}

Expand Down
Expand Up @@ -104,6 +104,20 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case DescribeTable(u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case DescribeTable(u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
Expand Down
Expand Up @@ -119,7 +119,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}

(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
Expand Down Expand Up @@ -170,6 +170,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+
// Since join costs in AQP can change between multiple runs, there is no reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
Batch("Join Reorder", FixedPoint(1),
Expand All @@ -196,6 +200,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)

// remove any batches with no rules. this may happen when subclasses do not add optional rules.
batches.filter(_.rules.nonEmpty)
}

/**
Expand Down Expand Up @@ -253,6 +260,11 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for early projection and filter pushdown to scans.
*/
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
* eventually run in the Optimizer.
Expand Down
Expand Up @@ -271,7 +271,7 @@ case class ShowNamespaces(
*/
case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)
override lazy val resolved: Boolean = table.resolved

override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes()
}
Expand Down Expand Up @@ -313,9 +313,7 @@ case class AlterTable(
table: NamedRelation,
changes: Seq[TableChange]) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

override lazy val resolved: Boolean = childrenResolved && {
override lazy val resolved: Boolean = table.resolved && {
changes.forall {
case add: AddColumn =>
add.fieldNames match {
Expand Down
Expand Up @@ -24,9 +24,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

private[sql] object CatalogV2Util {
Expand Down Expand Up @@ -224,6 +225,10 @@ private[sql] object CatalogV2Util {
case _: NoSuchNamespaceException => None
}

def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
loadTable(catalog, ident).map(DataSourceV2Relation.create)
}

def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}
Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2S
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

/**
* A logical plan representing a data source v2 table.
Expand All @@ -50,12 +51,53 @@ case class DataSourceV2Relation(
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}

def newScanBuilder(): ScanBuilder = {
table.asReadable.newScanBuilder(options)
override def computeStats(): Statistics = {
if (Utils.isTesting) {
// when testing, throw an exception if this computeStats method is called because stats should
// not be accessed before pushing the projection and filters to create a scan. otherwise, the
// stats are not accurate because they are based on a full table scan of all columns.
throw new IllegalStateException(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than failing a query
table.asReadable.newScanBuilder(options) match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
}

override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}

/**
* A logical plan for a DSv2 table with a scan already created.
*
* This is used in the optimizer to push filters and projection down before conversion to physical
* plan. This ensures that the stats that are used by the optimizer account for the filters and
* projection that will be pushed down.
*
* @param table a DSv2 [[Table]]
* @param scan a DSv2 [[Scan]]
* @param output the output attributes of this relation
*/
case class DataSourceV2ScanRelation(
table: Table,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {

override def name: String = table.name()

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}

override def computeStats(): Statistics = {
val scan = newScanBuilder().build()
scan match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
Expand All @@ -64,10 +106,6 @@ case class DataSourceV2Relation(
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}

override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -3218,7 +3218,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2Relation(table: FileTable, _, _) =>
case DataSourceV2ScanRelation(table: FileTable, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
Expand Down
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}

class SparkOptimizer(
Expand All @@ -32,10 +35,12 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalogManager) {

override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
SchemaPruning :: PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Schema Pruning", Once, SchemaPruning) :+
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Expand Down Expand Up @@ -64,7 +69,8 @@ class SparkOptimizer(
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+
ExtractPythonUDFFromJoinCondition.ruleName :+
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+
ExtractPythonUDFs.ruleName
ExtractPythonUDFs.ruleName :+
V2ScanRelationPushDown.ruleName

/**
* Optimization batches that are executed before the regular optimization batches (also before
Expand Down

0 comments on commit cfc80d0

Please sign in to comment.