Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32788][SQL] non-partitioned table scan should not have partition filter #29637

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -211,9 +211,6 @@ case class FileSourceScanExec(
val ret =
relation.location.listFiles(
partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
if (relation.partitionSchemaOption.isDefined) {
driverMetrics("numPartitions") = ret.length
}
setFilesNumAndSizeMetric(ret, true)
val timeTakenMs = NANOSECONDS.toMillis(
(System.nanoTime() - startTime) + optimizerMetadataTimeNs)
Expand Down Expand Up @@ -241,7 +238,6 @@ case class FileSourceScanExec(
setFilesNumAndSizeMetric(ret, false)
val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
driverMetrics("pruningTime") = timeTakenMs
driverMetrics("numPartitions") = ret.length
ret
} else {
selectedPartitions
Expand Down Expand Up @@ -439,6 +435,9 @@ case class FileSourceScanExec(
driverMetrics("staticFilesNum") = filesNum
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchemaOption.isDefined) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed, but I keep it here for: 1. avoid duplicated code. 2. make it super safe that we only update the numPartitions if table is partitioned.

driverMetrics("numPartitions") = partitions.length
}
}

override lazy val metrics = Map(
Expand Down
Expand Up @@ -154,9 +154,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
l.resolve(
fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
val partitionKeyFilters = if (partitionColumns.isEmpty) {
ExpressionSet(Nil)
} else {
ExpressionSet(normalizedFilters
.filter(_.references.subsetOf(partitionSet)))
}

logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this logging still meaningful for the non-partition case? I mean, could we move it inside the else block in the line 159-162?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense


Expand Down
Expand Up @@ -3684,6 +3684,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("SELECT 0 FROM ( SELECT * FROM B JOIN C USING (id)) " +
"JOIN ( SELECT * FROM B JOIN C USING (id)) USING (id)"), Row(0))
}

test("SPARK-32788: non-partitioned table scan should not have partition filter") {
withTable("t") {
spark.range(1).write.saveAsTable("t")
checkAnswer(sql("SELECT id FROM t WHERE (SELECT true)"), Row(0L))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit surprised that this query failed... nice catch.

}
}
}

case class Foo(bar: Option[String])