Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4851] Fixing handling of UTF8String w/in InSet operator #6739

Merged
merged 4 commits into from Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -234,7 +234,15 @@ object DataSkippingUtils extends Logging {
getTargetIndexedColumnName(attrRef, indexSchema)
.map { colName =>
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
hset.map(value => genColumnValuesEqualToExpression(colName, Literal(value), targetExprBuilder)).reduce(Or)
hset.map { value =>
// NOTE: [[Literal]] has a gap where it could hold [[UTF8String]], but [[Literal#apply]] doesn't
// accept [[UTF8String]]. As such we have to handle it separately
val lit = value match {
case str: UTF8String => Literal(str.toString)
case _ => Literal(value)
}
genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
}.reduce(Or)
}

// Filter "expr(colA) not in (B1, B2, ...)"
Expand Down
Expand Up @@ -21,7 +21,11 @@ import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
import org.apache.spark.sql.catalyst.expressions.{Expression, InSet, Not}
import org.apache.spark.sql.catalyst.optimizer.OptimizeIn
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.functions.{col, lower}
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
Expand Down Expand Up @@ -93,7 +97,8 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")

val resolvedFilterExpr: Expression = resolveExpr(spark, sourceFilterExprStr, sourceTableSchema)
val rows: Seq[String] = applyFilterExpr(resolvedFilterExpr, input)
val optimizedExpr = optimize(resolvedFilterExpr)
val rows: Seq[String] = applyFilterExpr(optimizedExpr, input)

assertEquals(expectedOutput, rows)
}
Expand All @@ -113,7 +118,6 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(expectedOutput, rows)
}


@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
Expand All @@ -134,6 +138,19 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
assertEquals(output, rows)
}


private def optimize(expr: Expression): Expression = {
val rules: Seq[Rule[LogicalPlan]] =
OptimizeIn ::
Nil

val plan: LogicalPlan = DummyExpressionHolder(Seq(expr))

rules.foldLeft(plan) {
case (plan, rule) => rule.apply(plan)
}.asInstanceOf[DummyExpressionHolder].exprs.head
}

private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = {
val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)

Expand Down Expand Up @@ -324,6 +341,14 @@ object TestDataSkippingUtils {
IndexRow("file_3", valueCount = 1, -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
s"B in (${(0 to 10).map(i => s"'a$i'").mkString(",")})",
Seq(
IndexRow("file_1", valueCount = 1, B_minValue = "a0", B_maxValue = "a10", B_nullCount = 0),
IndexRow("file_2", valueCount = 1, B_minValue = "b0", B_maxValue = "b10", B_nullCount = 0),
IndexRow("file_3", valueCount = 1, B_minValue = "a10", B_maxValue = "b20", B_nullCount = 0)
),
Seq("file_1", "file_3")),
arguments(
"A not in (0, 1)",
Seq(
Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.encoders

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode

// A dummy logical plan that can hold expressions and go through optimizer rules.
case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode {
override lazy val resolved = true
override def output: Seq[Attribute] = Nil
}