Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class HiveTableScanExec(
* @param partitions All partitions of the relation.
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
private[hive] def prunePartitions(partitions: Seq[HivePartition]): Seq[HivePartition] = {
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
Expand All @@ -162,18 +162,36 @@ case class HiveTableScanExec(
}
}

@transient lazy val prunedPartitions: Seq[HivePartition] = {
if (relation.prunedPartitions.nonEmpty) {
val hivePartitions =
relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) {
hivePartitions
} else {
prunePartitions(hivePartitions)
}
} else {
if (sparkSession.sessionState.conf.metastorePartitionPruning &&
partitionPruningPred.nonEmpty) {
rawPartitions
} else {
prunePartitions(rawPartitions)
}
}
}

// exposed for tests
@transient lazy val rawPartitions = {
@transient lazy val rawPartitions: Seq[HivePartition] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we call rawPartitions, the relation.prunedPartitions must be empty. We can remove relation.prunedPartitions.getOrElse below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, removed.

val prunedPartitions =
if (sparkSession.sessionState.conf.metastorePartitionPruning &&
partitionPruningPred.nonEmpty) {
partitionPruningPred.nonEmpty) {
// Retrieve the original attributes based on expression ID so that capitalization matches.
val normalizedFilters = partitionPruningPred.map(_.transform {
case a: AttributeReference => originalAttributes(a)
})
relation.prunedPartitions.getOrElse(
sparkSession.sessionState.catalog
.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters))
sparkSession.sessionState.catalog
.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
} else {
sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
}
Expand All @@ -189,7 +207,7 @@ case class HiveTableScanExec(
}
} else {
Utils.withDummyCallSite(sqlContext.sparkContext) {
hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions))
hadoopReader.makeRDDForPartitionedTable(prunedPartitions)
}
}
val numOutputRows = longMetric("numOutputRows")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
case p @ HiveTableScanExec(columns, relation, _) =>
val columnNames = columns.map(_.name)
val partValues = if (relation.isPartitioned) {
p.prunePartitions(p.rawPartitions).map(_.getValues)
p.prunedPartitions.map(_.getValues)
} else {
Seq.empty
}
Expand Down