Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23877][SQL][followup] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation #21111

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ object LocalRelation {
}
}

/**
* Logical plan node for scanning data from a local collection.
*
* @param data The local collection holding the data. It doesn't need to be sent to executors
* and then doesn't need to be serializable.
*/
case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Physical plan node for scanning data from a local collection.
*
* `Seq` may not be serializable and ideally we should not send `rows` and `unsafeRows`
* to the executors. Thus marking them as transient.
*/
case class LocalTableScanExec(
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
Expand All @@ -49,9 +50,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
}

plan.transform {
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
projectList, filters, PartitionedRelation(partAttrs, rel))) =>
// We only apply this optimization when only partitioned attributes are scanned.
if (a.references.subsetOf(attrs)) {
if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
// The project list and filters all only refer to partition attributes, which means the
// the Aggregator operator can also only refer to partition attributes, and filters are
// all partition filters. This is a metadata only query we can optimize.
val aggFunctions = aggExprs.flatMap(_.collect {
case agg: AggregateExpression => agg
})
Expand Down Expand Up @@ -102,7 +107,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
partFilters: Seq[Expression]): LogicalPlan = {
// this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
// relation's schema. PartitionedRelation ensures that the filters only reference partition cols
val relFilters = partFilters.map { e =>
val normalizedFilters = partFilters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
Expand All @@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(relFilters, Nil)
// partition data may be a stream, which can cause serialization to hit stack level too
// deep exceptions because it is a recursive structure in memory. converting to array
// avoids the problem.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is already fixed in https://issues.apache.org/jira/browse/SPARK-21884

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that does fix it but that's in a non-obvious way. What isn't clear is what guarantees that the rows used to construct the LocalRelation will never need to be serialized. Would it be reasonable for a future commit to remove the @transient modifier and re-introduce the problem?

I would rather this return the data in a non-recursive structure, but it's a minor point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable for a future commit to remove the @transient modifier and re-introduce the problem?

That's very unlikely. SPARK-21884 guarantees Spark won't serialize the rows and we have regression tests to protect us. BTW it would be a lot of work to make sure all the places that create LocalRelation do not use recursive structure. I'll add some comments to LocalRelation to emphasize it.

LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)

case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Expand All @@ -127,7 +129,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(SQLConf.get.sessionLocalTimeZone)
val partitions = if (partFilters.nonEmpty) {
catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
} else {
catalog.listPartitions(relation.tableMeta.identifier)
}
Expand All @@ -137,10 +139,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
})
}
// partition data may be a stream, which can cause serialization to hit stack level too
// deep exceptions because it is a recursive structure in memory. converting to array
// avoids the problem.
LocalRelation(partAttrs, partitionData.toArray)
LocalRelation(partAttrs, partitionData)

case _ =>
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
Expand All @@ -151,44 +150,21 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic

/**
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
* pair of the partition attributes, partition filters, and the table relation node.
*
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the partitioned table relation
* node.
* pair of the partition attributes and the table relation node.
*/
object PartitionedRelation extends PredicateHelper {

def unapply(
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
if fsRelation.partitionSchema.nonEmpty =>
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
Some((partAttrs, partAttrs, Nil, l))
Some((partAttrs, l))

case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
val partAttrs = AttributeSet(
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
Some((partAttrs, partAttrs, Nil, relation))

case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
if (p.references.subsetOf(attrs)) {
Some((partAttrs, p.outputSet, filters, relation))
} else {
None
}
}

case f @ Filter(condition, child) if condition.deterministic =>
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
if (f.references.subsetOf(partAttrs)) {
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
} else {
None
}
}
Some((partAttrs, relation))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

Expand All @@ -32,13 +33,22 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto

import spark.implicits._

before {
override def beforeAll(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this test suite to follow the existing style in OptimizeMetadataOnlyQuerySuite

super.beforeAll()
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
}

override protected def afterAll(): Unit = {
try {
sql("DROP TABLE IF EXISTS metadata_only")
} finally {
super.afterAll()
}
}

test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
withTable("metadata_only") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount

// verify the number of matching partitions
Expand All @@ -50,7 +60,7 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
}

test("SPARK-23877: filter on projected expression") {
withTable("metadata_only") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount

// verify the matching partitions
Expand Down