Skip to content

Commit

Permalink
[SPARK-34538][SQL] Hive Metastore support filter by not-in
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add `Not(In)` and `Not(InSet)` pattern when convert filter to metastore.

### Why are the changes needed?

`NOT IN` is a useful condition to prune partition, it would be better to support it.

Technically, we can convert `c not in(x,y)` to `c != x and c != y`, then push it to metastore.

Avoid metastore overflow and respect the config `spark.sql.hive.metastorePartitionPruningInSetThreshold`, `Not(InSet)` won't push to metastore if it's value exceeds the threshold.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes #31646 from ulysses-you/SPARK-34538.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and cloud-fan committed Mar 11, 2021
1 parent da086a8 commit 744a73d
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ object SQLConf {
.doc("The threshold of set size for InSet predicate when pruning partitions through Hive " +
"Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " +
"to be greater than or equal to the minimum value in set and less than or equal to the " +
"maximum value in set. Larger values may cause Hive Metastore stack overflow.")
"maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " +
"InSet inside Not with values exceeding the threshold, we won't push it to Hive Metastore."
)
.version("3.1.0")
.internal()
.intConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}

def convertNotInToAnd(name: String, values: Seq[String]): String = {
values.map(value => s"$name != $value").mkString("(", " and ", ")")
}

def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
case Literal(null, _) => true
case _ => false
}

val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold

Expand All @@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}

def convert(expr: Expression): Option[String] = expr match {
case Not(InSet(_, values)) if values.size > inSetThreshold =>
None

case Not(In(_, list)) if hasNullLiteral(list) => None
case Not(InSet(_, list)) if list.contains(null) => None

case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))

case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
val dataType = child.dataType
// Skip null here is safe, more details could see at ExtractableLiterals.
Expand All @@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
if useAdvanced && child.dataType == DateType =>
Some(convertInToOr(name, values))

case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType =>
Some(convertNotInToAnd(name, values))

case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))

case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
(a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil,
"datecol != 2019-01-01")

filterTest("not-in, string filter",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-in, string filter with null",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), Literal(null))))) :: Nil,
"")

filterTest("not-in, date filter",
(Not(In(a("datecol", DateType),
Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")))))) :: Nil,
"""(datecol != 2021-01-01 and datecol != 2021-01-02)""")

filterTest("not-in, date filter with null",
(Not(In(a("datecol", DateType),
Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")),
Literal(null))))) :: Nil,
"")

filterTest("not-inset, string filter",
(Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), Literal("b").eval())))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-inset, string filter with null",
(Not(InSet(a("strcol", StringType),
Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) :: Nil,
"")

filterTest("not-inset, date filter",
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil,
"""(datecol != 2020-01-01 and datecol != 2020-01-02)""")

filterTest("not-inset, date filter with null",
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval(),
Literal(null).eval())))) :: Nil,
"")

// Applying the predicate `x IN (NULL)` should return an empty set, but since this optimization
// will be applied by Catalyst, this filter converter does not need to account for this.
filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE",
Expand Down Expand Up @@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
}
}

test("Don't push not inset if it's values exceeds the threshold") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "2") {
val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
val converted = shim.convertFilters(testTable, Seq(filter), conf.sessionLocalTimeZone)
assert(converted.isEmpty)
}
}

test("SPARK-34538: Skip InSet null value during push filter to Hive metastore") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "3") {
val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}

test("getPartitionsByFilter: not in/inset string type") {
def check(condition: Expression, result: Seq[String]): Unit = {
testMetastorePartitionFiltering(
condition,
dsValue,
hValue,
result,
dateValue,
dateStrValue
)
}

check(
Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))),
Seq("ba", "bb")
)
check(
Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))),
chunkValue
)

check(
Not(InSet(attr("chunk"), Set(Literal("aa").eval(), Literal("ab").eval()))),
Seq("ba", "bb")
)
check(
Not(InSet(attr("chunk"), Set("aa", "ab", null))),
chunkValue
)
}

test("getPartitionsByFilter: not in/inset date type") {
def check(condition: Expression, result: Seq[String]): Unit = {
testMetastorePartitionFiltering(
condition,
dsValue,
hValue,
chunkValue,
result,
dateStrValue
)
}

check(
Not(In(attr("d"),
Seq(Literal(Date.valueOf("2019-01-01")),
Literal(Date.valueOf("2019-01-02"))))),
Seq("2019-01-03")
)
check(
Not(In(attr("d"),
Seq(Literal(Date.valueOf("2019-01-01")),
Literal(Date.valueOf("2019-01-02")), Literal(null)))),
dateValue
)

check(
Not(InSet(attr("d"),
Set(Literal(Date.valueOf("2019-01-01")).eval(),
Literal(Date.valueOf("2019-01-02")).eval()))),
Seq("2019-01-03")
)
check(
Not(InSet(attr("d"),
Set(Literal(Date.valueOf("2019-01-01")).eval(),
Literal(Date.valueOf("2019-01-02")).eval(), null))),
dateValue
)
}

test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") {
testMetastorePartitionFiltering(
attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),
Expand Down

0 comments on commit 744a73d

Please sign in to comment.