Skip to content

Commit

Permalink
[SPARK-32817][SQL] DPP throws error when broadcast side is empty
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In `SubqueryBroadcastExec.relationFuture`, if the `broadcastRelation` is an `EmptyHashedRelation`, then `broadcastRelation.keys()` will throw `UnsupportedOperationException`.

### Why are the changes needed?

To fix a bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a new test.

Closes #29671 from wzhfy/dpp_empty_broadcast.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
wzhfy authored and maropu committed Sep 8, 2020
1 parent bd3dc2f commit e7d9a24
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
Expand Up @@ -1091,7 +1091,7 @@ case object EmptyHashedRelation extends HashedRelation {
override def keyIsUnique: Boolean = true

override def keys(): Iterator[InternalRow] = {
throw new UnsupportedOperationException
Iterator.empty
}

override def close(): Unit = {}
Expand Down
Expand Up @@ -1344,6 +1344,23 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}

test("SPARK-32817: DPP throws error when the broadcast side is empty") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
"""
|SELECT * FROM fact_sk f
|JOIN dim_store s
|ON f.store_id = s.store_id WHERE s.country = 'XYZ'
""".stripMargin)

checkPartitionPruningPredicate(df, false, true)

checkAnswer(df, Nil)
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
Expand Down
Expand Up @@ -621,7 +621,7 @@ class HashedRelationSuite extends SharedSparkSession {
}
}

test("EmptyHashedRelation return null in get / getValue") {
test("EmptyHashedRelation override methods behavior test") {
val buildKey = Seq(BoundReference(0, LongType, false))
val hashed = HashedRelation(Seq.empty[InternalRow].toIterator, buildKey, 1, mm)
assert(hashed == EmptyHashedRelation)
Expand All @@ -631,6 +631,10 @@ class HashedRelationSuite extends SharedSparkSession {
assert(hashed.get(key) == null)
assert(hashed.getValue(0L) == null)
assert(hashed.getValue(key) == null)

assert(hashed.keys().isEmpty)
assert(hashed.keyIsUnique)
assert(hashed.estimatedSize == 0)
}

test("SPARK-32399: test methods related to key index") {
Expand Down

0 comments on commit e7d9a24

Please sign in to comment.