From 8c370987710e0ee11af297b66c936baa823bd86e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 6 Nov 2017 23:06:49 -0800 Subject: [PATCH 1/3] fix. --- .../spark/sql/hive/client/HiveShim.scala | 24 ++++++++++++------- .../sql/hive/client/HiveClientSuite.scala | 9 +++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 5c1ff2b76fdaa..39e2146764c03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -592,6 +592,14 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } + /** An extractor that matches all binary comparison operators except null-safe equality. */ + object OperatorsInPartitionFilters { + def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match { + case _: EqualNullSafe => None + case _ => Some((e.left, e.right)) + } + } + private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = { // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. lazy val varcharKeys = table.getPartitionKeys.asScala @@ -600,14 +608,14 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { .map(col => col.getName).toSet filters.collect { - case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => + case op @ OperatorsInPartitionFilters(a: Attribute, Literal(v, _: IntegralType)) => s"${a.name} ${op.symbol} $v" - case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) => + case op @ OperatorsInPartitionFilters(Literal(v, _: IntegralType), a: Attribute) => s"$v ${op.symbol} ${a.name}" - case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType)) + case op @ OperatorsInPartitionFilters(a: Attribute, Literal(v, _: StringType)) if !varcharKeys.contains(a.name) => s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" - case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) + case op @ OperatorsInPartitionFilters(Literal(v, _: StringType), a: Attribute) if !varcharKeys.contains(a.name) => s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" }.mkString(" and ") @@ -666,16 +674,16 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case InSet(a: Attribute, ExtractableValues(values)) if !varcharKeys.contains(a.name) && values.nonEmpty => convertInToOr(a, values) - case op @ BinaryComparison(a: Attribute, ExtractableLiteral(value)) + case op @ OperatorsInPartitionFilters(a: Attribute, ExtractableLiteral(value)) if !varcharKeys.contains(a.name) => s"${a.name} ${op.symbol} $value" - case op @ BinaryComparison(ExtractableLiteral(value), a: Attribute) + case op @ OperatorsInPartitionFilters(ExtractableLiteral(value), a: Attribute) if !varcharKeys.contains(a.name) => s"$value ${op.symbol} ${a.name}" - case op @ And(expr1, expr2) + case And(expr1, expr2) if convert.isDefinedAt(expr1) || convert.isDefinedAt(expr2) => (convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " and ", ")") - case op @ Or(expr1, expr2) + case Or(expr1, expr2) if convert.isDefinedAt(expr1) && convert.isDefinedAt(expr2) => s"(${convert(expr1)} or ${convert(expr2)})" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 3eedcf7e0874e..ce53acef51503 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -78,6 +78,15 @@ class HiveClientSuite(version: String) assert(filteredPartitions.size == testPartitionCount) } + test("getPartitionsByFilter: ds<=>20170101") { + // Should return all partitions where <=> is not supported + testMetastorePartitionFiltering( + "ds<=>20170101", + 20170101 to 20170103, + 0 to 23, + "aa" :: "ab" :: "ba" :: "bb" :: Nil) + } + test("getPartitionsByFilter: ds=20170101") { testMetastorePartitionFiltering( "ds=20170101", From be0e276a371319eb19467707a105d968d685a0c3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 6 Nov 2017 23:13:50 -0800 Subject: [PATCH 2/3] fix. --- .../spark/sql/hive/client/HiveShim.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 39e2146764c03..d99b0ccbd2648 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -592,8 +592,13 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } - /** An extractor that matches all binary comparison operators except null-safe equality. */ - object OperatorsInPartitionFilters { + + /** + * An extractor that matches all binary comparison operators except null-safe equality. + * + * null-safe equality is not supported by Hive metastore partition predicate pushdown + */ + object OperatorsInMetastorePartitionFPD { def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match { case _: EqualNullSafe => None case _ => Some((e.left, e.right)) @@ -608,14 +613,14 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { .map(col => col.getName).toSet filters.collect { - case op @ OperatorsInPartitionFilters(a: Attribute, Literal(v, _: IntegralType)) => + case op @ OperatorsInMetastorePartitionFPD(a: Attribute, Literal(v, _: IntegralType)) => s"${a.name} ${op.symbol} $v" - case op @ OperatorsInPartitionFilters(Literal(v, _: IntegralType), a: Attribute) => + case op @ OperatorsInMetastorePartitionFPD(Literal(v, _: IntegralType), a: Attribute) => s"$v ${op.symbol} ${a.name}" - case op @ OperatorsInPartitionFilters(a: Attribute, Literal(v, _: StringType)) + case op @ OperatorsInMetastorePartitionFPD(a: Attribute, Literal(v, _: StringType)) if !varcharKeys.contains(a.name) => s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" - case op @ OperatorsInPartitionFilters(Literal(v, _: StringType), a: Attribute) + case op @ OperatorsInMetastorePartitionFPD(Literal(v, _: StringType), a: Attribute) if !varcharKeys.contains(a.name) => s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" }.mkString(" and ") @@ -674,10 +679,10 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case InSet(a: Attribute, ExtractableValues(values)) if !varcharKeys.contains(a.name) && values.nonEmpty => convertInToOr(a, values) - case op @ OperatorsInPartitionFilters(a: Attribute, ExtractableLiteral(value)) + case op @ OperatorsInMetastorePartitionFPD(a: Attribute, ExtractableLiteral(value)) if !varcharKeys.contains(a.name) => s"${a.name} ${op.symbol} $value" - case op @ OperatorsInPartitionFilters(ExtractableLiteral(value), a: Attribute) + case op @ OperatorsInMetastorePartitionFPD(ExtractableLiteral(value), a: Attribute) if !varcharKeys.contains(a.name) => s"$value ${op.symbol} ${a.name}" case And(expr1, expr2) From bae74941c28a3efb0b713e97b5fb57af7be654eb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 7 Nov 2017 09:30:06 -0800 Subject: [PATCH 3/3] fix. --- .../apache/spark/sql/hive/client/HiveShim.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index d99b0ccbd2648..bd1b300416990 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -596,9 +596,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { /** * An extractor that matches all binary comparison operators except null-safe equality. * - * null-safe equality is not supported by Hive metastore partition predicate pushdown + * Null-safe equality is not supported by Hive metastore partition predicate pushdown */ - object OperatorsInMetastorePartitionFPD { + object SpecialBinaryComparison { def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match { case _: EqualNullSafe => None case _ => Some((e.left, e.right)) @@ -613,14 +613,14 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { .map(col => col.getName).toSet filters.collect { - case op @ OperatorsInMetastorePartitionFPD(a: Attribute, Literal(v, _: IntegralType)) => + case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: IntegralType)) => s"${a.name} ${op.symbol} $v" - case op @ OperatorsInMetastorePartitionFPD(Literal(v, _: IntegralType), a: Attribute) => + case op @ SpecialBinaryComparison(Literal(v, _: IntegralType), a: Attribute) => s"$v ${op.symbol} ${a.name}" - case op @ OperatorsInMetastorePartitionFPD(a: Attribute, Literal(v, _: StringType)) + case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: StringType)) if !varcharKeys.contains(a.name) => s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" - case op @ OperatorsInMetastorePartitionFPD(Literal(v, _: StringType), a: Attribute) + case op @ SpecialBinaryComparison(Literal(v, _: StringType), a: Attribute) if !varcharKeys.contains(a.name) => s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" }.mkString(" and ") @@ -679,10 +679,10 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case InSet(a: Attribute, ExtractableValues(values)) if !varcharKeys.contains(a.name) && values.nonEmpty => convertInToOr(a, values) - case op @ OperatorsInMetastorePartitionFPD(a: Attribute, ExtractableLiteral(value)) + case op @ SpecialBinaryComparison(a: Attribute, ExtractableLiteral(value)) if !varcharKeys.contains(a.name) => s"${a.name} ${op.symbol} $value" - case op @ OperatorsInMetastorePartitionFPD(ExtractableLiteral(value), a: Attribute) + case op @ SpecialBinaryComparison(ExtractableLiteral(value), a: Attribute) if !varcharKeys.contains(a.name) => s"$value ${op.symbol} ${a.name}" case And(expr1, expr2)