Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 #29564

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
Expand Down Expand Up @@ -54,7 +55,7 @@ case class DataSourceV2Relation(
tableIdent.map(_.unquotedString).getOrElse(s"${source.name}:unknown")
}

override def pushedFilters: Seq[Expression] = Seq.empty
override def pushedFilters: Seq[Filter] = Seq.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change Expression to Filter, which is a public interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More explanation. Why changing from Expression to org.apache.spark.sql.sources.Filter

DataSourceV2ScanExec.pushedFilters are defined as array of Expressions whose equal function has expression_id in scope. So for example, Expression isnotnull(d_day_name#22364) is not considered equal to isnotnull(d_day_name#22420). Therefore, the right thing is to define and compare pushedFilter as Filter class.

At both Spark 3.0 and affected Spark 2.4's tests suite, Filter is the class being used. And the above 4 places seem to be the only places that miss to have pushedFilter as class Filter.
(Because pushedFilters are defined the right way in the above test suite, Spark 32708 was not caught by my tests previously added for SPARK-32609, another exchange reuse bug.

Usage of Expression was introduced by PR [SPARK-23203][SQL] DataSourceV2: Use immutable logical plan. From the PR's description and original intention, I don't see a necessary reason to maintain Expression.


override def simpleString: String = "RelationV2 " + metadataString

Expand Down Expand Up @@ -92,7 +93,7 @@ case class StreamingDataSourceV2Relation(

override def simpleString: String = "Streaming RelationV2 " + metadataString

override def pushedFilters: Seq[Expression] = Nil
override def pushedFilters: Seq[Filter] = Nil

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
Expand All @@ -38,7 +39,7 @@ case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient pushedFilters: Seq[Expression],
@transient pushedFilters: Seq[Filter],
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {

Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Rep
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader

Expand All @@ -38,11 +39,11 @@ object DataSourceV2Strategy extends Strategy {
*/
private def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
filters: Seq[Expression]): (Seq[Filter], Seq[Expression]) = {
reader match {
case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
val translatedFilterToExpr = mutable.HashMap.empty[Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]

Expand All @@ -61,8 +62,7 @@ object DataSourceV2Strategy extends Strategy {
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
.map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
(pushedFilters, untranslatableExprs ++ postScanFilters)
(r.pushedFilters(), untranslatableExprs ++ postScanFilters)

case _ => (Nil, filters)
}
Expand Down
Expand Up @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -49,7 +50,7 @@ trait DataSourceV2StringFormat {
/**
* The filters which have been pushed to the data source.
*/
def pushedFilters: Seq[Expression]
def pushedFilters: Seq[Filter]

private def sourceName: String = source match {
case registered: DataSourceRegister => registered.shortName()
Expand Down