From f27288e29211d47e24767cf7731914cdf9865bc1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 1 Oct 2015 19:14:14 +0800 Subject: [PATCH 1/4] Push down string filters to Parquet. --- .../datasources/parquet/ParquetFilters.scala | 41 +++++++++++++++++++ .../parquet/ParquetFilterSuite.scala | 20 +++++++++ 2 files changed, 61 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c6b3fe7900da8..05a93fcb3cc81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -48,6 +48,32 @@ private[sql] object ParquetFilters { override def inverseCanDrop(statistics: Statistics[T]): Boolean = false } + object StringFilter extends Enumeration { + type Mode = Value + val STARTS_WITH, ENDS_WITH, CONTAINS = Value + } + + case class StringFilter( + v: java.lang.String, + mode: StringFilter.Mode) extends UserDefinedPredicate[Binary] with Serializable { + + override def keep(value: Binary): Boolean = { + val str = value.toStringUsingUTF8() + mode match { + case StringFilter.STARTS_WITH => + str.startsWith(v) + case StringFilter.ENDS_WITH => + str.endsWith(v) + case StringFilter.CONTAINS => + str.contains(v) + } + } + + override def canDrop(statistics: Statistics[Binary]): Boolean = false + + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + } + private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) @@ -164,6 +190,14 @@ private[sql] object ParquetFilters { FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) } + private val makeStringFilter: PartialFunction[DataType, + (String, String, StringFilter.Mode) => FilterPredicate] = { + case StringType => + (n: String, v: String, mode: StringFilter.Mode) => + FilterApi.userDefined(binaryColumn(n), + StringFilter(v.asInstanceOf[java.lang.String], mode)) + } + private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { case IntegerType => (n: String, v: Set[Any]) => @@ -236,6 +270,13 @@ private[sql] object ParquetFilters { case sources.GreaterThanOrEqual(name, value) => makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.StringStartsWith(name, value) => + makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.STARTS_WITH)) + case sources.StringEndsWith(name, value) => + makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.ENDS_WITH)) + case sources.StringContains(name, value) => + makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.CONTAINS)) + case sources.And(lhs, rhs) => (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45ad3fde559c0..883c57e0feb9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -245,6 +245,26 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4") checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4"))) } + + withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString * 5 + "test"))) { implicit df => + checkFilterPredicate(('_1 contains "11").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "11111test") + + checkFilterPredicate(('_1 contains "2test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "22222test") + + checkFilterPredicate(('_1 contains "3t").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "33333test") + + checkFilterPredicate(('_1 startsWith "22").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "22222test") + + checkFilterPredicate(('_1 endsWith "4test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "44444test") + + checkFilterPredicate(('_1 endsWith "2test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], "22222test") + } } test("filter pushdown - binary") { From 4d00ed08ab1019d69948685e5c099fa5eaa2156a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 3 Oct 2015 22:45:09 +0800 Subject: [PATCH 2/4] Move pattern matching code out of repeatedly called function. --- .../datasources/parquet/ParquetFilters.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 05a93fcb3cc81..c5e452563c22a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -57,16 +57,18 @@ private[sql] object ParquetFilters { v: java.lang.String, mode: StringFilter.Mode) extends UserDefinedPredicate[Binary] with Serializable { + private val compare = mode match { + case StringFilter.STARTS_WITH => + (x: java.lang.String) => x.startsWith(v) + case StringFilter.ENDS_WITH => + (x: java.lang.String) => x.endsWith(v) + case StringFilter.CONTAINS => + (x: java.lang.String) => x.contains(v) + } + override def keep(value: Binary): Boolean = { val str = value.toStringUsingUTF8() - mode match { - case StringFilter.STARTS_WITH => - str.startsWith(v) - case StringFilter.ENDS_WITH => - str.endsWith(v) - case StringFilter.CONTAINS => - str.contains(v) - } + compare(str) } override def canDrop(statistics: Statistics[Binary]): Boolean = false From eb134b993720a42154c430e508847f852882c5c1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 5 Oct 2015 23:27:24 +0800 Subject: [PATCH 3/4] Push down In filter to Parquet. --- .../datasources/parquet/ParquetFilters.scala | 6 ++++ .../parquet/ParquetFilterSuite.scala | 35 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c5e452563c22a..1df304d7ffad7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -201,6 +201,9 @@ private[sql] object ParquetFilters { } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { + case BooleanType => + (n: String, v: Set[Any]) => + FilterApi.userDefined(booleanColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Boolean]])) case IntegerType => (n: String, v: Set[Any]) => FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) @@ -252,6 +255,9 @@ private[sql] object ParquetFilters { case sources.IsNotNull(name) => makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) + case sources.In(name, values) => + makeInSet.lift(dataTypeOf(name)).map(_(name, values.toSet)) + case sources.EqualTo(name, value) => makeEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.Not(sources.EqualTo(name, value)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 883c57e0feb9e..c719026d257bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -112,6 +112,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 === true, classOf[Eq[_]], true) checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true) checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false) + + checkFilterPredicate(('_1 in(true)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], true) + checkFilterPredicate(('_1 in(false)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], false) } } @@ -138,6 +143,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) + + checkFilterPredicate(('_1 in(1, 2)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(1), Row(2))) + checkFilterPredicate(('_1 in(3, 4)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(3), Row(4))) } } @@ -164,6 +174,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) + + checkFilterPredicate(('_1 in(1L, 2L)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(1L), Row(2L))) + checkFilterPredicate(('_1 in(3L, 4L)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(3L), Row(4L))) } } @@ -190,6 +205,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) + + checkFilterPredicate(('_1 in(1.0f, 2.0f)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0f), Row(2.0f))) + checkFilterPredicate(('_1 in(3.0f, 4.0f)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0f), Row(4.0f))) } } @@ -216,6 +236,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) + + checkFilterPredicate(('_1 in(1.0, 2.0)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0), Row(2.0))) + checkFilterPredicate(('_1 in(3.0, 4.0)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0), Row(4.0))) } } @@ -244,6 +269,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4") checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4"))) + + checkFilterPredicate(('_1 in("1", "2")).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row("1"), Row("2"))) + checkFilterPredicate(('_1 in("3", "4")).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row("3"), Row("4"))) } withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString * 5 + "test"))) { implicit df => @@ -298,6 +328,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b) checkBinaryFilterPredicate( '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b))) + + checkFilterPredicate(('_1 in(1.b, 2.b)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(1.b), Row(2.b))) + checkFilterPredicate(('_1 in(3.b, 4.b)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], Seq(Row(3.b), Row(4.b))) } } From 02bbab84a82d645250b2afe9c58bed194523da38 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 17 Oct 2015 11:53:33 +0800 Subject: [PATCH 4/4] For comments. --- .../datasources/parquet/ParquetFilters.scala | 99 +++++++++------ .../parquet/ParquetFilterSuite.scala | 116 ++++++++++++------ 2 files changed, 137 insertions(+), 78 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 37306d0577150..43b00e4b08ee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -27,46 +27,53 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.sources import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String private[sql] object ParquetFilters { - case class SetInFilter[T <: Comparable[T]]( - valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { + case class InSetFilter[T <: Comparable[T]](valueSet: Set[T]) + extends UserDefinedPredicate[T] { + + private val min = valueSet.min + private val max = valueSet.max override def keep(value: T): Boolean = { value != null && valueSet.contains(value) } - override def canDrop(statistics: Statistics[T]): Boolean = false + override def canDrop(statistics: Statistics[T]): Boolean = { + statistics.getMax.compareTo(min) < 0 || statistics.getMin.compareTo(max) > 0 + } override def inverseCanDrop(statistics: Statistics[T]): Boolean = false } - object StringFilter extends Enumeration { - type Mode = Value - val STARTS_WITH, ENDS_WITH, CONTAINS = Value - } + abstract class StringFilter extends UserDefinedPredicate[Binary] { + override def canDrop(statistics: Statistics[Binary]): Boolean = false + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false - case class StringFilter( - v: java.lang.String, - mode: StringFilter.Mode) extends UserDefinedPredicate[Binary] with Serializable { - - private val compare = mode match { - case StringFilter.STARTS_WITH => - (x: java.lang.String) => x.startsWith(v) - case StringFilter.ENDS_WITH => - (x: java.lang.String) => x.endsWith(v) - case StringFilter.CONTAINS => - (x: java.lang.String) => x.contains(v) + def binaryToUTF8String(value: Binary): UTF8String = { + // This is a trick used in CatalystStringConverter to steal the underlying + // byte array of the binary without copying it. + val buffer = value.toByteBuffer + val offset = buffer.position() + val numBytes = buffer.limit() - buffer.position() + UTF8String.fromBytes(buffer.array(), offset, numBytes) } + } - override def keep(value: Binary): Boolean = { - val str = value.toStringUsingUTF8() - compare(str) - } + case class StringStartsWithFilter(prefix: String) extends StringFilter { + private val strToCompare: UTF8String = UTF8String.fromString(prefix) + override def keep(value: Binary): Boolean = binaryToUTF8String(value).startsWith(strToCompare) + } - override def canDrop(statistics: Statistics[Binary]): Boolean = false + case class StringEndsWithFilter(suffix: String) extends StringFilter { + private val strToCompare: UTF8String = UTF8String.fromString(suffix) + override def keep(value: Binary): Boolean = binaryToUTF8String(value).endsWith(strToCompare) + } - override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + case class StringContainsFilter(str: String) extends StringFilter { + private val strToCompare: UTF8String = UTF8String.fromString(str) + override def keep(value: Binary): Boolean = binaryToUTF8String(value).contains(strToCompare) } private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -185,38 +192,54 @@ private[sql] object ParquetFilters { FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) } - private val makeStringFilter: PartialFunction[DataType, - (String, String, StringFilter.Mode) => FilterPredicate] = { + private val makeStringStartsFilter: PartialFunction[DataType, + (String, String) => FilterPredicate] = { + case StringType => + (n: String, v: String) => + FilterApi.userDefined(binaryColumn(n), + StringStartsWithFilter(v.asInstanceOf[java.lang.String])) + } + + private val makeStringEndsFilter: PartialFunction[DataType, + (String, String) => FilterPredicate] = { + case StringType => + (n: String, v: String) => + FilterApi.userDefined(binaryColumn(n), + StringEndsWithFilter(v.asInstanceOf[java.lang.String])) + } + + private val makeStringContainsFilter: PartialFunction[DataType, + (String, String) => FilterPredicate] = { case StringType => - (n: String, v: String, mode: StringFilter.Mode) => + (n: String, v: String) => FilterApi.userDefined(binaryColumn(n), - StringFilter(v.asInstanceOf[java.lang.String], mode)) + StringContainsFilter(v.asInstanceOf[java.lang.String])) } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { case BooleanType => (n: String, v: Set[Any]) => - FilterApi.userDefined(booleanColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Boolean]])) + FilterApi.userDefined(booleanColumn(n), InSetFilter(v.asInstanceOf[Set[java.lang.Boolean]])) case IntegerType => (n: String, v: Set[Any]) => - FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) + FilterApi.userDefined(intColumn(n), InSetFilter(v.asInstanceOf[Set[java.lang.Integer]])) case LongType => (n: String, v: Set[Any]) => - FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]])) + FilterApi.userDefined(longColumn(n), InSetFilter(v.asInstanceOf[Set[java.lang.Long]])) case FloatType => (n: String, v: Set[Any]) => - FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]])) + FilterApi.userDefined(floatColumn(n), InSetFilter(v.asInstanceOf[Set[java.lang.Float]])) case DoubleType => (n: String, v: Set[Any]) => - FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) + FilterApi.userDefined(doubleColumn(n), InSetFilter(v.asInstanceOf[Set[java.lang.Double]])) case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))))) + InSetFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))))) case BinaryType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) + InSetFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) } /** @@ -272,11 +295,11 @@ private[sql] object ParquetFilters { makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.StringStartsWith(name, value) => - makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.STARTS_WITH)) + makeStringStartsFilter.lift(dataTypeOf(name)).map(_(name, value)) case sources.StringEndsWith(name, value) => - makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.ENDS_WITH)) + makeStringEndsFilter.lift(dataTypeOf(name)).map(_(name, value)) case sources.StringContains(name, value) => - makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.CONTAINS)) + makeStringContainsFilter.lift(dataTypeOf(name)).map(_(name, value)) case sources.And(lhs, rhs) => (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5e34b28068393..5b9e3f08e22e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -113,10 +113,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true) checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false) - checkFilterPredicate(('_1 in(true)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], true) - checkFilterPredicate(('_1 in(false)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], false) + checkFilterPredicate( + ('_1.in(true)).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], true) + checkFilterPredicate( + ('_1.in(false)).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], false) } } @@ -144,10 +144,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - checkFilterPredicate(('_1 in(1, 2)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(1), Row(2))) - checkFilterPredicate(('_1 in(3, 4)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(3), Row(4))) + checkFilterPredicate( + ('_1.in(1, 2)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(1), Row(2))) + checkFilterPredicate( + ('_1.in(3, 4)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(3), Row(4))) } } @@ -175,10 +179,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - checkFilterPredicate(('_1 in(1L, 2L)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(1L), Row(2L))) - checkFilterPredicate(('_1 in(3L, 4L)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(3L), Row(4L))) + checkFilterPredicate( + ('_1.in(1L, 2L)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(1L), Row(2L))) + checkFilterPredicate( + ('_1.in(3L, 4L)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(3L), Row(4L))) } } @@ -206,10 +214,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - checkFilterPredicate(('_1 in(1.0f, 2.0f)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0f), Row(2.0f))) - checkFilterPredicate(('_1 in(3.0f, 4.0f)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0f), Row(4.0f))) + checkFilterPredicate( + ('_1.in(1.0f, 2.0f)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(1.0f), Row(2.0f))) + checkFilterPredicate( + ('_1.in(3.0f, 4.0f)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(3.0f), Row(4.0f))) } } @@ -237,10 +249,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - checkFilterPredicate(('_1 in(1.0, 2.0)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0), Row(2.0))) - checkFilterPredicate(('_1 in(3.0, 4.0)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0), Row(4.0))) + checkFilterPredicate( + ('_1.in(1.0, 2.0)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(1.0), Row(2.0))) + checkFilterPredicate( + ('_1.in(3.0, 4.0)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(3.0), Row(4.0))) } } @@ -270,30 +286,46 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4") checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4"))) - checkFilterPredicate(('_1 in("1", "2")).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row("1"), Row("2"))) - checkFilterPredicate(('_1 in("3", "4")).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row("3"), Row("4"))) + checkFilterPredicate( + ('_1.in("1", "2")).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row("1"), Row("2"))) + checkFilterPredicate( + ('_1.in("3", "4")).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row("3"), Row("4"))) } withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString * 5 + "test"))) { implicit df => - checkFilterPredicate(('_1 contains "11").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "11111test") + checkFilterPredicate( + ('_1 contains "11").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "11111test") - checkFilterPredicate(('_1 contains "2test").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "22222test") + checkFilterPredicate( + ('_1 contains "2test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "22222test") - checkFilterPredicate(('_1 contains "3t").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "33333test") + checkFilterPredicate( + ('_1 contains "3t").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "33333test") - checkFilterPredicate(('_1 startsWith "22").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "22222test") + checkFilterPredicate( + ('_1 startsWith "22").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "22222test") - checkFilterPredicate(('_1 endsWith "4test").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "44444test") + checkFilterPredicate( + ('_1 endsWith "4test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "44444test") - checkFilterPredicate(('_1 endsWith "2test").asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], "22222test") + checkFilterPredicate( + ('_1 endsWith "2test").asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + "22222test") } } @@ -329,10 +361,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkBinaryFilterPredicate( '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b))) - checkFilterPredicate(('_1 in(1.b, 2.b)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(1.b), Row(2.b))) - checkFilterPredicate(('_1 in(3.b, 4.b)).asInstanceOf[Predicate], - classOf[UserDefinedByInstance[_, _]], Seq(Row(3.b), Row(4.b))) + checkFilterPredicate( + ('_1.in(1.b, 2.b)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(1.b), Row(2.b))) + checkFilterPredicate( + ('_1.in(3.b, 4.b)).asInstanceOf[Predicate], + classOf[UserDefinedByInstance[_, _]], + Seq(Row(3.b), Row(4.b))) } }