From 0558b6cfd916c9b48f79adfa4e0cf2ce821d51ec Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 22 Sep 2022 15:43:45 -0700 Subject: [PATCH] [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (#6739) Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com> --- .../spark/sql/hudi/DataSkippingUtils.scala | 12 ++++++-- .../apache/hudi/TestDataSkippingUtils.scala | 29 +++++++++++++++++-- .../encoders/DummyExpressionHolder.scala | 29 +++++++++++++++++++ 3 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 0fe62da0ded3..4a3cf38895f0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression} import org.apache.spark.sql.functions.col import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils} import org.apache.spark.unsafe.types.UTF8String @@ -234,7 +234,15 @@ object DataSkippingUtils extends Logging { getTargetIndexedColumnName(attrRef, indexSchema) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) - hset.map(value => genColumnValuesEqualToExpression(colName, Literal(value), targetExprBuilder)).reduce(Or) + hset.map { value => + // NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't + // accept [[UTF8String]]. As such we have to handle it separately + val lit = value match { + case str: UTF8String => Literal(str.toString) + case _ => Literal(value) + } + genColumnValuesEqualToExpression(colName, lit, targetExprBuilder) + }.reduce(Or) } // Filter "expr(colA) not in (B1, B2, ...)" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index da3fd52e97e1..63db2f52fc2a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -21,7 +21,11 @@ import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder import org.apache.spark.sql.catalyst.expressions.{Expression, InSet, Not} +import org.apache.spark.sql.catalyst.optimizer.OptimizeIn +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.functions.{col, lower} import org.apache.spark.sql.hudi.DataSkippingUtils import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE @@ -93,7 +97,8 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC") val resolvedFilterExpr: Expression = resolveExpr(spark, sourceFilterExprStr, sourceTableSchema) - val rows: Seq[String] = applyFilterExpr(resolvedFilterExpr, input) + val optimizedExpr = optimize(resolvedFilterExpr) + val rows: Seq[String] = applyFilterExpr(optimizedExpr, input) assertEquals(expectedOutput, rows) } @@ -113,7 +118,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor assertEquals(expectedOutput, rows) } - @ParameterizedTest @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { @@ -134,6 +138,19 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor assertEquals(output, rows) } + + private def optimize(expr: Expression): Expression = { + val rules: Seq[Rule[LogicalPlan]] = + OptimizeIn :: + Nil + + val plan: LogicalPlan = DummyExpressionHolder(Seq(expr)) + + rules.foldLeft(plan) { + case (plan, rule) => rule.apply(plan) + }.asInstanceOf[DummyExpressionHolder].exprs.head + } + private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = { val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) @@ -324,6 +341,14 @@ object TestDataSkippingUtils { IndexRow("file_3", valueCount = 1, -2, -1, 0) ), Seq("file_1", "file_2")), + arguments( + s"B in (${(0 to 10).map(i => s"'a$i'").mkString(",")})", + Seq( + IndexRow("file_1", valueCount = 1, B_minValue = "a0", B_maxValue = "a10", B_nullCount = 0), + IndexRow("file_2", valueCount = 1, B_minValue = "b0", B_maxValue = "b10", B_nullCount = 0), + IndexRow("file_3", valueCount = 1, B_minValue = "a10", B_maxValue = "b20", B_nullCount = 0) + ), + Seq("file_1", "file_3")), arguments( "A not in (0, 1)", Seq( diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala new file mode 100644 index 000000000000..87dc8c087970 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/encoders/DummyExpressionHolder.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.encoders + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LeafNode + +// A dummy logical plan that can hold expressions and go through optimizer rules. +case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode { + override lazy val resolved = true + override def output: Seq[Attribute] = Nil +}