Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31500][SQL] collect_set() of BinaryType returns duplicate elements #28351

Closed

Conversation

planga82
Copy link
Contributor

@planga82 planga82 commented Apr 26, 2020

What changes were proposed in this pull request?

The collect_set() aggregate function should produce a set of distinct elements. When the column argument's type is BinayType this is not the case.

Example:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class R(id: String, value: String, bytes: Array[Byte])
def makeR(id: String, value: String) = R(id, value, value.getBytes)
val df = Seq(makeR("a", "dog"), makeR("a", "cat"), makeR("a", "cat"), makeR("b", "fish")).toDF()
// In the example below "bytesSet" erroneously has duplicates but "stringSet" does not (as expected).
df.agg(collect_set('value) as "stringSet", collect_set('bytes) as "byteSet").show(truncate=false)
// The same problem is displayed when using window functions.
val win = Window.partitionBy('id).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = df.select(
  collect_set('value).over(win) as "stringSet",
  collect_set('bytes).over(win) as "bytesSet"
)
.select('stringSet, 'bytesSet, size('stringSet) as "stringSetSize", size('bytesSet) as "bytesSetSize")
.show()

We use a HashSet buffer to accumulate the results, the problem is that arrays equality in Scala don't behave as expected, arrays ara just plain java arrays and the equality don't compare the content of the arrays
Array(1, 2, 3) == Array(1, 2, 3) => False
The result is that duplicates are not removed in the hashset

The solution proposed is that in the last stage, when we have all the data in the Hashset buffer, we delete duplicates changing the type of the elements and then transform it to the original type.
This transformation is only applied when we have a BinaryType

Why are the changes needed?

Fix the bug explained

Does this PR introduce any user-facing change?

Yes. Now collect_set() correctly deduplicates array of byte.

How was this patch tested?

Unit testing

@planga82
Copy link
Contributor Author

CC @hvanhovell @rxin

@planga82
Copy link
Contributor Author

@hvanhovell I think now we have a better solution.

.select(struct($"x", $"y").as("a"))
val ret = df1.select(collect_set($"a")).collect()
.map(r => r.getAs[mutable.WrappedArray[_]](0)).head
assert(ret.length == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move the new tests into a new test unit like test("SPARK-31500: collect_set() of BinaryType returns duplicate elements") { ..}?

@@ -139,6 +143,31 @@ case class CollectSet(

def this(child: Expression) = this(child, 0, 0)

/* SPARK-31500
* Array[Byte](BinaryType) Scala equality don't works as expected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

  /* 
   * SPARK-31500: Array[Byte](BinaryType) Scala equality don't works as expected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a Scala issue. Java byte arrays use referential equality and identity hash codes. This has tripped up many many people before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know the main reason, I'm going tom explain it better

@maropu
Copy link
Member

maropu commented Apr 29, 2020

ok to test

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122029 has finished for PR 28351 at commit 586c4b7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -65,8 +67,10 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper
new GenericArrayData(buffer.toArray)
}

lazy val typeChild = child.dataType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things:

  • Can you name this bufferElementType or something like that?
  • Please make it protected.
  • I would prefer to make this an abstract method (lazy val is not really needed) and make the subclass implement it.

@@ -46,13 +46,15 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper
// actual order of input rows.
override lazy val deterministic: Boolean = false

def getValueOnUpdate(value: Any): Any = InternalRow.copyValue(value)
Copy link
Contributor

@hvanhovell hvanhovell Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments to the ones at typeChild:

  • Make this protected.
  • Can we name this converToBufferElement or something like that.
  • I would also prefer to make this an abstract method and have the subclass implement it.

case other => other
}

override def getValueOnUpdate(value: Any): Any = InternalRow.copyValue(value) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please match on the datatype (or create a flag).

override def eval(buffer: mutable.HashSet[Any]): Any = {
val bufferUpdated: mutable.HashSet[Any] =
child.dataType match {
case BinaryType => buffer.map(_.asInstanceOf[UnsafeArrayData].toByteArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map call produces another set, which is not for free. We could do the following:

override def eval(buffer: mutable.HashSet[Any]): Any = {
  val array = child.dataType match {
    case BinaryType => buffer.iterator().map(_.asInstanceOf[UnsafeArrayData].toByteArray).toArray()
    case _ => buffer.toArray()
  }
  new GenericArrayData(array)
}

val bytesTest2 = "test2".getBytes
val df1 = Seq(bytesTest1, bytesTest1, bytesTest2).toDF("a")
val ret = df1.select(collect_set($"a")).collect()
.map(r => r.getAs[mutable.WrappedArray[_]](0)).head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid casting things to their concrete implementation. This can break when we upgrade to newer scala version. In this case use Seq[_].

@planga82
Copy link
Contributor Author

Thank you all for the comments, are very interesting

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122070 has finished for PR 28351 at commit 2cd1a22.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122077 has finished for PR 28351 at commit a5b1dd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

override def convertToBufferElement(value: Any): Any = {
val v = InternalRow.copyValue(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit you only need to copy for the default case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's true. Cleaner in that way

@@ -17,6 +17,7 @@

package org.apache.spark.sql

import scala.collection.mutable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, I forget it, thanks

override def eval(buffer: mutable.HashSet[Any]): Any = {
val array = child.dataType match {
case BinaryType =>
buffer.iterator.map(_.asInstanceOf[UnsafeArrayData].toByteArray).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit safer to cast to ArrayData here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122138 has finished for PR 28351 at commit 7ea0059.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented May 1, 2020

Hi, @hvanhovell . What about your comment? Are we going to merge this AS-IS or do you want to revise the comment more?

@dongjoon-hyun
Copy link
Member

cc @holdenk since this is a correctness issue in Apache Spark 2.0.2 ~ 2.4.5 at least.

* Array[Byte](BinaryType) Scala equality don't works as expected
* so HashSet return duplicates, we have to change types to drop
* this duplicates and make collect_set work as expected for this
* data type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this comment clearer for others and move it into the line 163-164?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it's better to move to this line. I've tried to clarify the message

@@ -530,6 +530,26 @@ class DataFrameAggregateSuite extends QueryTest
)
}

test("SPARK-31500: collect_set() of BinaryType returns duplicate elements") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: collect_set() of BinaryType should not return duplicate elements?

val df = Seq(bytesTest1, bytesTest1, bytesTest2).toDF("a")
val ret = df.select(collect_set($"a")).collect()
.map(r => r.getAs[Seq[_]](0)).head
assert(ret.length == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checkAnswer(df.select(size(collect_set($"a"))), Row(2) :: Nil)?

.select(struct($"x", $"y").as("a"))
val ret1 = df1.select(collect_set($"a")).collect()
.map(r => r.getAs[Seq[_]](0)).head
assert(ret1.length == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checkAnswer(df1.select(size(collect_set($"a"))), Row(2) :: Nil)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorter way to compare, good idea!

* data type
*/
override lazy val bufferElementType = child.dataType match {
case BinaryType => ArrayType(BinaryType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayType(ByteType)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, it works anyway but I think it's better in this way


override def convertToBufferElement(value: Any): Any = child.dataType match {
/*
* SPARK-31500
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we don't this jira ID here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, yes, we have the commit to look for more information

* SPARK-31500
* collect_set() of BinaryType should not return duplicate elements,
* Java byte arrays use referential equality and identity hash codes
* so we need to use a different Scala object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a different Scala object -> a catalyst value for arrays?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@maropu
Copy link
Member

maropu commented May 1, 2020

Looks fine except for the minor comments.

@SparkQA
Copy link

SparkQA commented May 1, 2020

Test build #122168 has finished for PR 28351 at commit 4782aea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 4fecc20 May 1, 2020
maropu pushed a commit that referenced this pull request May 1, 2020
…ents

### What changes were proposed in this pull request?

The collect_set() aggregate function should produce a set of distinct elements. When the column argument's type is BinayType this is not the case.

Example:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class R(id: String, value: String, bytes: Array[Byte])
def makeR(id: String, value: String) = R(id, value, value.getBytes)
val df = Seq(makeR("a", "dog"), makeR("a", "cat"), makeR("a", "cat"), makeR("b", "fish")).toDF()
// In the example below "bytesSet" erroneously has duplicates but "stringSet" does not (as expected).
df.agg(collect_set('value) as "stringSet", collect_set('bytes) as "byteSet").show(truncate=false)
// The same problem is displayed when using window functions.
val win = Window.partitionBy('id).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = df.select(
  collect_set('value).over(win) as "stringSet",
  collect_set('bytes).over(win) as "bytesSet"
)
.select('stringSet, 'bytesSet, size('stringSet) as "stringSetSize", size('bytesSet) as "bytesSetSize")
.show()
```

We use a HashSet buffer to accumulate the results, the problem is that arrays equality in Scala don't behave as expected, arrays ara just plain java arrays and the equality don't compare the content of the arrays
Array(1, 2, 3) == Array(1, 2, 3)  => False
The result is that duplicates are not removed in the hashset

The solution proposed is that in the last stage, when we have all the data in the Hashset buffer, we delete duplicates changing the type of the elements and then transform it to the original type.
This transformation is only applied when we have a BinaryType

### Why are the changes needed?
Fix the bug explained

### Does this PR introduce any user-facing change?
Yes. Now `collect_set()` correctly deduplicates array of byte.

### How was this patch tested?
Unit testing

Closes #28351 from planga82/feature/SPARK-31500_COLLECT_SET_bug.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
(cherry picked from commit 4fecc20)
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu pushed a commit that referenced this pull request May 1, 2020
…ents

### What changes were proposed in this pull request?

The collect_set() aggregate function should produce a set of distinct elements. When the column argument's type is BinayType this is not the case.

Example:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class R(id: String, value: String, bytes: Array[Byte])
def makeR(id: String, value: String) = R(id, value, value.getBytes)
val df = Seq(makeR("a", "dog"), makeR("a", "cat"), makeR("a", "cat"), makeR("b", "fish")).toDF()
// In the example below "bytesSet" erroneously has duplicates but "stringSet" does not (as expected).
df.agg(collect_set('value) as "stringSet", collect_set('bytes) as "byteSet").show(truncate=false)
// The same problem is displayed when using window functions.
val win = Window.partitionBy('id).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = df.select(
  collect_set('value).over(win) as "stringSet",
  collect_set('bytes).over(win) as "bytesSet"
)
.select('stringSet, 'bytesSet, size('stringSet) as "stringSetSize", size('bytesSet) as "bytesSetSize")
.show()
```

We use a HashSet buffer to accumulate the results, the problem is that arrays equality in Scala don't behave as expected, arrays ara just plain java arrays and the equality don't compare the content of the arrays
Array(1, 2, 3) == Array(1, 2, 3)  => False
The result is that duplicates are not removed in the hashset

The solution proposed is that in the last stage, when we have all the data in the Hashset buffer, we delete duplicates changing the type of the elements and then transform it to the original type.
This transformation is only applied when we have a BinaryType

### Why are the changes needed?
Fix the bug explained

### Does this PR introduce any user-facing change?
Yes. Now `collect_set()` correctly deduplicates array of byte.

### How was this patch tested?
Unit testing

Closes #28351 from planga82/feature/SPARK-31500_COLLECT_SET_bug.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
(cherry picked from commit 4fecc20)
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
@maropu
Copy link
Member

maropu commented May 1, 2020

Thanks, all! Merged to master/branch-3.0/branch-2.4. cc: @dongjoon-hyun @holdenk

@SparkQA
Copy link

SparkQA commented May 1, 2020

Test build #122170 has finished for PR 28351 at commit 67f55a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants