Skip to content

Commit

Permalink
[HUDI-4851] Fixing handling of UTF8String w/in InSet operator (#6739
Browse files Browse the repository at this point in the history
)

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
2 people authored and yuzhaojing committed Sep 29, 2022
1 parent da71afd commit 7facad1
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -234,7 +234,15 @@ object DataSkippingUtils extends Logging {
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
hset.map(value => genColumnValuesEqualToExpression(colName, Literal(value), targetExprBuilder)).reduce(Or)
hset.map { value =>
// NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't
// accept [[UTF8String]]. As such we have to handle it separately
val lit = value match {
case str: UTF8String => Literal(str.toString)
case _ => Literal(value)
}
genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
}.reduce(Or)
}

// Filter "expr(colA) not in (B1, B2, ...)"
Expand Down
Expand Up @@ -20,7 +20,11 @@ package org.apache.hudi
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
import org.apache.spark.sql.catalyst.expressions.{Expression, InSet, Not}
import org.apache.spark.sql.catalyst.optimizer.OptimizeIn
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.functions.{col, lower}
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
Expand Down Expand Up @@ -94,7 +98,8 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")

val resolvedFilterExpr: Expression = exprUtils.resolveExpr(spark, sourceFilterExprStr, sourceTableSchema)
val rows: Seq[String] = applyFilterExpr(resolvedFilterExpr, input)
val optimizedExpr = optimize(resolvedFilterExpr)
val rows: Seq[String] = applyFilterExpr(optimizedExpr, input)

assertEquals(expectedOutput, rows)
}
Expand All @@ -114,7 +119,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(expectedOutput, rows)
}


@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
Expand All @@ -135,6 +139,19 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(output, rows)
}


private def optimize(expr: Expression): Expression = {
val rules: Seq[Rule[LogicalPlan]] =
OptimizeIn ::
Nil

val plan: LogicalPlan = DummyExpressionHolder(Seq(expr))

rules.foldLeft(plan) {
case (plan, rule) => rule.apply(plan)
}.asInstanceOf[DummyExpressionHolder].exprs.head
}

private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = {
val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)

Expand Down Expand Up @@ -325,6 +342,14 @@ object TestDataSkippingUtils {
IndexRow("file_3", valueCount = 1, -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
s"B in (${(0 to 10).map(i => s"'a$i'").mkString(",")})",
Seq(
IndexRow("file_1", valueCount = 1, B_minValue = "a0", B_maxValue = "a10", B_nullCount = 0),
IndexRow("file_2", valueCount = 1, B_minValue = "b0", B_maxValue = "b10", B_nullCount = 0),
IndexRow("file_3", valueCount = 1, B_minValue = "a10", B_maxValue = "b20", B_nullCount = 0)
),
Seq("file_1", "file_3")),
arguments(
"A not in (0, 1)",
Seq(
Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.encoders

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode

// A dummy logical plan that can hold expressions and go through optimizer rules.
case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode {
override lazy val resolved = true
override def output: Seq[Attribute] = Nil
}

0 comments on commit 7facad1

Please sign in to comment.