Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31553][SQL] Fix isInCollection for collection sizes above the optimisation threshold #28328

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import scala.collection.immutable.TreeSet

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -519,7 +519,9 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with

override def sql: String = {
val valueSQL = child.sql
val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's wrong with Literal.sql?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't accept UTF8String at

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an orthogonal fix. @cloud-fan and @MaxGekk . We need this in branch-2.4 because this is SPARK-12593 (Converts resolved logical plan back to SQL) since Apache Spark 2.0.0, don't we?

If you don't mind, please file a separate JIRA issue with a separate test case. We need to merge this seperately.
cc @holdenk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know the JIRA when you file it and I'll add it to my tracking for the 2.4.6 release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, please file a separate JIRA issue with a separate test case. We need to merge this seperately.

So far, I have not found how to trigger the issue in the sql method without this fix. I will think of that and try tomorrow but if you have some ideas you are welcome.

What I have already tried is to build a dataset with IsIn, optimizer converts it to InSet but I wasn't able to call sql() on the replaced expression.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #28343

val listSQL = hset.toSeq
.map(CatalystTypeConverters.convertToScala(_, child.dataType))
.mkString(", ")
s"($valueSQL IN ($listSQL))"
}
}
Expand Down
8 changes: 4 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Expand Up @@ -828,11 +828,11 @@ class Column(val expr: Expression) extends Logging {
* @since 2.4.0
*/
def isInCollection(values: scala.collection.Iterable[_]): Column = withExpr {
val hSet = values.toSet[Any]
if (hSet.size > SQLConf.get.optimizerInSetConversionThreshold) {
InSet(expr, hSet)
val exprValues = values.toSeq.map(lit(_).expr)
if (exprValues.size > SQLConf.get.optimizerInSetConversionThreshold) {
InSet(expr, exprValues.map(_.eval()).toSet)
} else {
In(expr, values.toSeq.map(lit(_).expr))
In(expr, exprValues)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is caused by SPARK-29048 (Improve performance on Column.isInCollection() with a large size collection, #25754 ) and only affects 3.0.0, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming, @MaxGekk .
cc @WeichenXu123 and @gatorsmile

}
}

Expand Down
Expand Up @@ -869,4 +869,15 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
df.select(typedLit(("a", 2, 1.0))),
Row(Row("a", 2, 1.0)) :: Nil)
}

test("SPARK-31553: isInCollection for collection sizes above a threshold") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @MaxGekk .

cc @aokolnychyi and @dbtsai

val threshold = 100
withSQLConf(SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> threshold.toString) {
val set = (0 until 2 * threshold).map(_.toString).toSet
val elem = "10"
val data = Seq(elem).toDF("x")
assert(set.contains(elem))
checkAnswer(data.select($"x".isInCollection(set)), Row(true))
}
}
}