Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34538][SQL] Hive Metastore support filter by not-in #31646

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ object SQLConf {
.doc("The threshold of set size for InSet predicate when pruning partitions through Hive " +
"Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " +
"to be greater than or equal to the minimum value in set and less than or equal to the " +
"maximum value in set. Larger values may cause Hive Metastore stack overflow.")
"maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " +
"the predicate of Not InSet which values exceeds the threshold, we won't push it to " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for InSet inside Not with values exceeding ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

"Hive Metastore.")
.version("3.1.0")
.internal()
.intConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
values.map(value => s"$name = $value").mkString("(", " or ", ")")
}

def convertNotInToAnd(name: String, values: Seq[String]): String = {
values.map(value => s"$name != $value").mkString("(", " and ", ")")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than 10,000 values will cause the Hive Metastore stack overflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about stoping push it If it's values size exceeds the threshold ? In this case it can not be covert like >= and <=.

}

def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
case Literal(null, _) => true
case _ => false
}

val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold

Expand All @@ -763,10 +772,17 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}

def convert(expr: Expression): Option[String] = expr match {
case Not(In(_, list)) if hasNullLiteral(list) => None
case Not(InSet(_, list)) if list.contains(null) => None

case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not(a IN (null, 2)): if a = 1, the final result is null.
a != 2: if a = 1, the final result is true

I think this rewrite is incorrect. We need to make sure the values of IN are all not null.

Copy link
Contributor Author

@ulysses-you ulysses-you Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, missed this. For Not(InSet) case, null value can not be skip safely, it should convert to and(xx != null).

if useAdvanced =>
Some(convertNotInToAnd(name, values))

case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
val dataType = child.dataType
// Skip null here is safe, more details could see at ExtractableLiterals.
Expand All @@ -775,14 +791,25 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
convert(And(GreaterThanOrEqual(child, Literal(sortedValues.head, dataType)),
LessThanOrEqual(child, Literal(sortedValues.last, dataType))))

case Not(InSet(_, values)) if values.size > inSetThreshold =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to the beginning so that unsupported cases are grouped together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved.

None

case InSet(child @ ExtractAttribute(SupportedAttribute(name)), ExtractableDateValues(values))
if useAdvanced && child.dataType == DateType =>
Some(convertInToOr(name, values))

case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType =>
Some(convertNotInToAnd(name, values))

case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Some(convertInToOr(name, values))

case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)))
if useAdvanced =>
Some(convertNotInToAnd(name, values))

case op @ SpecialBinaryComparison(
ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name ${op.symbol} $value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,52 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
(a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil,
"datecol != 2019-01-01")

filterTest("not-in int filter",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a test case for NULL as while, e.g. NOT IN (1, 2, ..., NULL)? Given we have bug before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be, add the null value for test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems exists some issue about null value. Created #31659.

(Not(In(a("intcol", IntegerType), Seq(Literal(1), Literal(2))))) :: Nil,
"(intcol != 1 and intcol != 2)")

filterTest("not-in int filter with null",
(Not(In(a("intcol", IntegerType), Seq(Literal(1), Literal(2), Literal(null))))) :: Nil,
"")

filterTest("not-in string filter",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-in string filter with null",
(Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), Literal(null))))) :: Nil,
"")

filterTest("not-inset, int filter",
(Not(InSet(a("intcol", IntegerType), Set(1, 2)))) :: Nil,
"(intcol != 1 and intcol != 2)")

filterTest("not-inset, int filter with null",
(Not(InSet(a("intcol", IntegerType), Set(1, 2, null)))) :: Nil,
"")

filterTest("not-inset, string filter",
(Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), Literal("b").eval())))) :: Nil,
"""(strcol != "a" and strcol != "b")""")

filterTest("not-inset, string filter with null",
(Not(InSet(a("strcol", StringType),
Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) :: Nil,
"")

filterTest("not-inset, date filter",
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil,
"""(datecol != 2020-01-01 and datecol != 2020-01-02)""")

filterTest("not-inset, date filter with null",
(Not(InSet(a("datecol", DateType),
Set(Literal(Date.valueOf("2020-01-01")).eval(),
Literal(Date.valueOf("2020-01-02")).eval(),
Literal(null).eval())))) :: Nil,
"")

cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
// Applying the predicate `x IN (NULL)` should return an empty set, but since this optimization
// will be applied by Catalyst, this filter converter does not need to account for this.
filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE",
Expand Down Expand Up @@ -187,6 +233,14 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
}
}

test("Don't push not inset if it's values exceeds the threshold") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "2") {
val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
val converted = shim.convertFilters(testTable, Seq(filter), conf.sessionLocalTimeZone)
assert(converted.isEmpty)
}
}

test("SPARK-34538: Skip InSet null value during push filter to Hive metastore") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "3") {
val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,52 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}

test("getPartitionsByFilter: not in chunk") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the effective test that can catch correctness bugs. Does it test null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coverage of previous test is not good. I rewrite the test and include you mentioned.

testMetastorePartitionFiltering(
Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))),
dsValue,
hValue,
Seq("ba", "bb"),
dateValue,
dateStrValue
)
}

test("getPartitionsByFilter: not in ds") {
testMetastorePartitionFiltering(
Not(In(attr("ds"), Seq(Literal(20170102)))),
Seq(20170101, 20170103),
hValue,
chunkValue,
dateValue,
dateStrValue
)
}

test("getPartitionsByFilter: not inset date") {
testMetastorePartitionFiltering(
Not(InSet(attr("d"),
Set(Literal(Date.valueOf("2019-01-01")).eval(),
Literal(Date.valueOf("2019-01-02")).eval()))),
dsValue,
hValue,
chunkValue,
Seq("2019-01-03"),
dateStrValue
)
}

test("getPartitionsByFilter: not inset h") {
testMetastorePartitionFiltering(
Not(InSet(attr("h"), Set(1, 2))),
dsValue,
Seq(0, 3, 4),
chunkValue,
dateValue,
dateStrValue
)
}

test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") {
testMetastorePartitionFiltering(
attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),
Expand Down