diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e225b3a1b3f8..610f436050b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -863,7 +863,9 @@ object SQLConf { .doc("The threshold of set size for InSet predicate when pruning partitions through Hive " + "Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " + "to be greater than or equal to the minimum value in set and less than or equal to the " + - "maximum value in set. Larger values may cause Hive Metastore stack overflow.") + "maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " + + "InSet inside Not with values exceeding the threshold, we won't push it to Hive Metastore." + ) .version("3.1.0") .internal() .intConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index db67480ceb77..2f7fe96013de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { values.map(value => s"$name = $value").mkString("(", " or ", ")") } + def convertNotInToAnd(name: String, values: Seq[String]): String = { + values.map(value => s"$name != $value").mkString("(", " and ", ")") + } + + def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists { + case Literal(null, _) => true + case _ => false + } + val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold @@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } def convert(expr: Expression): Option[String] = expr match { + case Not(InSet(_, values)) if values.size > inSetThreshold => + None + + case Not(In(_, list)) if hasNullLiteral(list) => None + case Not(InSet(_, list)) if list.contains(null) => None + case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)) if useAdvanced => Some(convertInToOr(name, values)) + case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))) + if useAdvanced => + Some(convertNotInToAnd(name, values)) + case InSet(child, values) if useAdvanced && values.size > inSetThreshold => val dataType = child.dataType // Skip null here is safe, more details could see at ExtractableLiterals. @@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { if useAdvanced && child.dataType == DateType => Some(convertInToOr(name, values)) + case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)), + ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType => + Some(convertNotInToAnd(name, values)) + case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)) if useAdvanced => Some(convertInToOr(name, values)) + case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))) + if useAdvanced => + Some(convertNotInToAnd(name, values)) + case op @ SpecialBinaryComparison( ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => Some(s"$name ${op.symbol} $value") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 79b34bd141de..fcdc97325d01 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { (a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil, "datecol != 2019-01-01") + filterTest("not-in, string filter", + (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil, + """(strcol != "a" and strcol != "b")""") + + filterTest("not-in, string filter with null", + (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), Literal(null))))) :: Nil, + "") + + filterTest("not-in, date filter", + (Not(In(a("datecol", DateType), + Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")))))) :: Nil, + """(datecol != 2021-01-01 and datecol != 2021-01-02)""") + + filterTest("not-in, date filter with null", + (Not(In(a("datecol", DateType), + Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")), + Literal(null))))) :: Nil, + "") + + filterTest("not-inset, string filter", + (Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), Literal("b").eval())))) :: Nil, + """(strcol != "a" and strcol != "b")""") + + filterTest("not-inset, string filter with null", + (Not(InSet(a("strcol", StringType), + Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) :: Nil, + "") + + filterTest("not-inset, date filter", + (Not(InSet(a("datecol", DateType), + Set(Literal(Date.valueOf("2020-01-01")).eval(), + Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil, + """(datecol != 2020-01-01 and datecol != 2020-01-02)""") + + filterTest("not-inset, date filter with null", + (Not(InSet(a("datecol", DateType), + Set(Literal(Date.valueOf("2020-01-01")).eval(), + Literal(Date.valueOf("2020-01-02")).eval(), + Literal(null).eval())))) :: Nil, + "") + // Applying the predicate `x IN (NULL)` should return an empty set, but since this optimization // will be applied by Catalyst, this filter converter does not need to account for this. filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE", @@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { } } + test("Don't push not inset if it's values exceeds the threshold") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "2") { + val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3))) + val converted = shim.convertFilters(testTable, Seq(filter), conf.sessionLocalTimeZone) + assert(converted.isEmpty) + } + } + test("SPARK-34538: Skip InSet null value during push filter to Hive metastore") { withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "3") { val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala index ebab105eb48c..16e1a415977a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala @@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String) dateStrValue) } + test("getPartitionsByFilter: not in/inset string type") { + def check(condition: Expression, result: Seq[String]): Unit = { + testMetastorePartitionFiltering( + condition, + dsValue, + hValue, + result, + dateValue, + dateStrValue + ) + } + + check( + Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))), + Seq("ba", "bb") + ) + check( + Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))), + chunkValue + ) + + check( + Not(InSet(attr("chunk"), Set(Literal("aa").eval(), Literal("ab").eval()))), + Seq("ba", "bb") + ) + check( + Not(InSet(attr("chunk"), Set("aa", "ab", null))), + chunkValue + ) + } + + test("getPartitionsByFilter: not in/inset date type") { + def check(condition: Expression, result: Seq[String]): Unit = { + testMetastorePartitionFiltering( + condition, + dsValue, + hValue, + chunkValue, + result, + dateStrValue + ) + } + + check( + Not(In(attr("d"), + Seq(Literal(Date.valueOf("2019-01-01")), + Literal(Date.valueOf("2019-01-02"))))), + Seq("2019-01-03") + ) + check( + Not(In(attr("d"), + Seq(Literal(Date.valueOf("2019-01-01")), + Literal(Date.valueOf("2019-01-02")), Literal(null)))), + dateValue + ) + + check( + Not(InSet(attr("d"), + Set(Literal(Date.valueOf("2019-01-01")).eval(), + Literal(Date.valueOf("2019-01-02")).eval()))), + Seq("2019-01-03") + ) + check( + Not(InSet(attr("d"), + Set(Literal(Date.valueOf("2019-01-01")).eval(), + Literal(Date.valueOf("2019-01-02")).eval(), null))), + dateValue + ) + } + test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") { testMetastorePartitionFiltering( attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),