Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31553][SQL] Fix isInCollection for collection sizes above the optimisation threshold #28328

Closed
wants to merge 9 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Apr 24, 2020

What changes were proposed in this pull request?

The InSet expression expects input collections of internal Catalyst types, for example hset must contain elements of UTF8String for child of string type. So, it means isInCollection must convert users values to internal Catalyst values but currently it doesn't perform the conversion. That leads to incorrect results for collection sizes above the threshold spark.sql.optimizer.inSetConversionThreshold.

The bug was introduced by #25754.

Why are the changes needed?

The changes fix incorrect behaviour of isInCollection. For example, if the SQL config spark.sql.optimizer.inSetConversionThresholdis set to 10 (by default):

val set = (0 to 20).map(_.toString).toSet
val data = Seq("1").toDF("x")
data.select($"x".isInCollection(set).as("isInCollection")).show()

The function must return 'true' because "1" is in the set of "0" ... "20" but it returns "false":

+--------------+
|isInCollection|
+--------------+
|         false|
+--------------+

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

def testExplainTime(collectionSize: Int) = {
        val df = spark.range(10).withColumn("id2", col("id") + 1)
        val list = Range(0, collectionSize).toList
        val startTime = System.currentTimeMillis() 
        df.where(col("id").isInCollection(list)).where(col("id2").isInCollection(list)).explain()
        val elapsedTime = System.currentTimeMillis() - startTime
        println(s"cost time: ${elapsedTime}ms")
}

Then test on collection size 5, 10, 100, 1000, 10000, test result is:

collection size explain time (before) explain time (after) w/o optimization
5 64ms 65ms 62ms
10 68ms 64ms 88ms
100 41ms 162ms 227ms
1000 98ms 406ms 652ms
10000 654ms 2579ms 4504ms

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 24, 2020

@cloud-fan @HyukjinKwon Please, review the PR.

@@ -519,7 +519,9 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with

override def sql: String = {
val valueSQL = child.sql
val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's wrong with Literal.sql?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't accept UTF8String at

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an orthogonal fix. @cloud-fan and @MaxGekk . We need this in branch-2.4 because this is SPARK-12593 (Converts resolved logical plan back to SQL) since Apache Spark 2.0.0, don't we?

If you don't mind, please file a separate JIRA issue with a separate test case. We need to merge this seperately.
cc @holdenk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know the JIRA when you file it and I'll add it to my tracking for the 2.4.6 release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, please file a separate JIRA issue with a separate test case. We need to merge this seperately.

So far, I have not found how to trigger the issue in the sql method without this fix. I will think of that and try tomorrow but if you have some ideas you are welcome.

What I have already tried is to build a dataset with IsIn, optimizer converts it to InSet but I wasn't able to call sql() on the replaced expression.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #28343

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121756 has finished for PR 28328 at commit 47a1e44.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 24, 2020

The build #28328 (comment) fails on
org.apache.spark.sql.hive.thriftserver.CliSuite.SPARK-11188 Analysis error reporting
I don't think it is related to the changes

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 24, 2020

jenkins, retest this, please

@@ -869,4 +869,15 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
df.select(typedLit(("a", 2, 1.0))),
Row(Row("a", 2, 1.0)) :: Nil)
}

test("SPARK-31553: isInCollection for collection sizes above a threshold") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @MaxGekk .

cc @aokolnychyi and @dbtsai

} else {
In(expr, values.toSeq.map(lit(_).expr))
In(expr, exprValues)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is caused by SPARK-29048 (Improve performance on Column.isInCollection() with a large size collection, #25754 ) and only affects 3.0.0, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming, @MaxGekk .
cc @WeichenXu123 and @gatorsmile

@dongjoon-hyun
Copy link
Member

This is a nice catch, @MaxGekk . As I wrote in the comment, it would be great if we can proceed two PR separately. Thanks!

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121762 has finished for PR 28328 at commit 47a1e44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

I merged #28343 first. Could you rebase this PR, @MaxGekk ? Thanks.

…lection

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
#	sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 25, 2020

@dongjoon-hyun I have rebased but the test from this PR starts failing. The function convertToScala(elem, child.dataType) doesn't convert UTF8String to String because child.dataType is NullType when InSet is created from isInCollection.

The test passed here because I don't wrap the result of convertToScala by Literal.

Probably, we need to revert 7d8216a . Sorry, my bad.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 25, 2020

@dongjoon-hyun @cloud-fan Regarding to this PR, WDYT of reverting the optimization #25754 instead of to fix it by this PR?

@dongjoon-hyun
Copy link
Member

It's possible. I guess that we need @gatorsmile 's opinion since he was the merger of that.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 25, 2020

I have fixed the test failure after rebasing on #28343 by passing element type from the place where the type is known 67f34a1 . I could open a follow-up PR for #28343 @dongjoon-hyun Let me know if you are ok with that.

@SparkQA
Copy link

SparkQA commented Apr 25, 2020

Test build #121815 has finished for PR 28328 at commit 67f34a1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class InSet(

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121818 has finished for PR 28328 at commit dd69aa6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 26, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121826 has finished for PR 28328 at commit dd69aa6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 26, 2020

jenkins, retest this, please

case class InSet(
child: Expression,
hset: Set[Any],
hsetElemType: DataType) extends UnaryExpression with Predicate {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matching internal Catalyst's types to external types is ambiguous. For example,
Long -> Long
Long -> Timestamp

Also type of child can be unknown when InSet has to know Catalyst's type of hset elements.

hsetElemType is needed to eliminate the ambiguity

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can make this Option[DataType] because only a few things are ambiguous?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but if a caller passes None, InSet will be not able to infer elem types when child.dataType is NullType like in this case. dataType returns NullType if child is PrettyAttribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when hsetElemType can be different from child.dataType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When InSet is created from isInCollection, in that case child.dataType is NullType. For example, it is NullType in the test https://github.com/apache/spark/pull/28328/files#diff-aa655ba249e00d2591b21cf6a360cf82R886 because child is PrettyAttribute when the sql method is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And InSet.sql() is called from Dataset.select _.named:

Project(untypedCols.map(_.named), logicalPlan)

The named method calls toPrettySQL(expr):

case expr: Expression => Alias(expr, toPrettySQL(expr))()

The toPrettySQL method calls sql:

def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121840 has finished for PR 28328 at commit 05ce50a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 26, 2020

Failures of CliSuite annoy a lot #28328 (comment) @dongjoon-hyun @cloud-fan @gatorsmile This is the PR to repeat tests from CliSuite #28329

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 26, 2020

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121835 has finished for PR 28328 at commit dd69aa6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121844 has finished for PR 28328 at commit 05ce50a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cc @viirya , this is another instance that merging InSet and Set can fix the issue.

@cloud-fan
Copy link
Contributor

Actually this PR shows we still need InSet, to make the analyzer fast...

val exprValues = values.toSeq.map(lit(_).expr)
if (exprValues.size > SQLConf.get.optimizerInSetConversionThreshold) {
val elemType = exprValues.headOption.map(_.dataType).getOrElse(NullType)
InSet(expr, exprValues.map(_.eval()).toSet, elemType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we make sure the expr has the same data type as exprValues? Do we have a type coercion rule for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure, we need something similar to In.checkInputDataTypes() in InSet:

override def checkInputDataTypes(): TypeCheckResult = {
val mismatchOpt = list.find(l => !DataType.equalsStructurally(l.dataType, value.dataType,
ignoreNullability = true))
if (mismatchOpt.isDefined) {
TypeCheckResult.TypeCheckFailure(s"Arguments must be same type but were: " +
s"${value.dataType.catalogString} != ${mismatchOpt.get.dataType.catalogString}")
} else {
TypeUtils.checkForOrderingExpr(value.dataType, s"function $prettyName")
}
}

I could add such check in the PR if you don't mind.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added similar check to InSet

@viirya
Copy link
Member

viirya commented Apr 27, 2020

Actually this PR shows we still need InSet, to make the analyzer fast...

What that means? We optimize In with InSet in optimizer, right?

@cloud-fan
Copy link
Contributor

A In with many values is slow to analyze, as the type coercion rules or In.resolved are very slow.

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 27, 2020

I have update PR's description and added a column w/o optimization. I got the numbers by running the code:

  test("isInCollection benchmark") {
    def testExplainTime(collectionSize: Int) = {
      val df = spark.range(10).withColumn("id2", col("id") + 1)
      val list = Range(0, collectionSize).toList
      val startTime = System.currentTimeMillis()
      df.where(col("id").isInCollection(list)).where(col("id2").isInCollection(list)).explain()
      val elapsedTime = System.currentTimeMillis() - startTime
      println(s"cost time: ${elapsedTime}ms")
    }
    withSQLConf(SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "100000000") {
      testExplainTime(1)
      testExplainTime(5)
      testExplainTime(10)
      testExplainTime(100)
      testExplainTime(1000)
      testExplainTime(10000)
    }
  }

@SparkQA
Copy link

SparkQA commented Apr 27, 2020

Test build #121899 has finished for PR 28328 at commit 4bc0e26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Apr 27, 2020

A In with many values is slow to analyze, as the type coercion rules or In.resolved are very slow.

That's a pain point. But when we merge In and InSet, we can have a constructor similar to current InSet? We can have both list and hset in In. For example, in isInCollection, we still can construct In which has empty children but only hset?

@HyukjinKwon
Copy link
Member

Okay, the more I look, the more it makes me to think we should revert #25754 rather than adding bandaid fixes. Shall we revert?

@MaxGekk
Copy link
Member Author

MaxGekk commented Apr 28, 2020

After offline discussion with @gatorsmile @cloud-fan @HyukjinKwon, we decided to revert #25754 . I will open a PR for that and close this PR.

@cloud-fan cloud-fan closed this in b7cabc8 Apr 28, 2020
cloud-fan pushed a commit that referenced this pull request Apr 28, 2020
…n.isInCollection() with a large size collection"

### What changes were proposed in this pull request?
This reverts commit 5631a96.

Closes #28328

### Why are the changes needed?
The PR  #25754 introduced a bug in `isInCollection`. For example, if the SQL config `spark.sql.optimizer.inSetConversionThreshold`is set to 10 (by default):
```scala
val set = (0 to 20).map(_.toString).toSet
val data = Seq("1").toDF("x")
data.select($"x".isInCollection(set).as("isInCollection")).show()
```
The function must return **'true'** because "1" is in the set of "0" ... "20" but it returns "false":
```
+--------------+
|isInCollection|
+--------------+
|         false|
+--------------+
```

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
```
$ ./build/sbt "test:testOnly *ColumnExpressionSuite"
```

Closes #28388 from MaxGekk/fix-isInCollection-revert.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b7cabc8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@MaxGekk MaxGekk deleted the fix-isInCollection branch June 5, 2020 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
8 participants