Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
this
}

/**
* Check if a key exists at the provided position using object equality rather than
* cooperative equality. Otherwise, hash sets will mishandle values for which `==`
* and `equals` return different results, like 0.0/-0.0 and NaN/NaN.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told that in scala == is the same as equals, but eq is a different operator. I need to refresh my knowledge now :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the differences are subtle:

scala> 0.0 == -0.0
val res0: Boolean = true

scala> 0.0 equals -0.0
val res1: Boolean = false

scala> Double.NaN == Double.NaN
val res2: Boolean = false

scala> Double.NaN equals Double.NaN
val res3: Boolean = true

There is a long discussion on the Scala forums from 2017 about this difference and some of the problems it causes:

Can we get rid of cooperative equality?

*
* See: https://issues.apache.org/jira/browse/SPARK-45599
*/
@annotation.nowarn("cat=other-non-cooperative-equals")
private def keyExistsAtPos(k: T, pos: Int) =
_data(pos) equals k

/**
* Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
* The caller is responsible for calling rehashIfNeeded.
Expand All @@ -130,8 +141,7 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
_bitset.set(pos)
_size += 1
return pos | NONEXISTENCE_MASK
} else if (_data(pos) == k) {
// Found an existing key.
} else if (keyExistsAtPos(k, pos)) {
return pos
} else {
// quadratic probing with values increase by 1, 2, 3, ...
Expand Down Expand Up @@ -165,7 +175,7 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
while (true) {
if (!_bitset.get(pos)) {
return INVALID_POS
} else if (k == _data(pos)) {
} else if (keyExistsAtPos(k, pos)) {
return pos
} else {
// quadratic probing with values increase by 1, 2, 3, ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,34 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
map(null) = null
assert(map.get(null) === Some(null))
}

test("SPARK-45599: 0.0 and -0.0 should count distinctly; NaNs should count together") {
// Exactly these elements provided in roughly this order trigger a condition where lookups of
// 0.0 and -0.0 in the bitset happen to collide, causing their counts to be merged incorrectly
// and inconsistently if `==` is used to check for key equality.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention the NaN behavior as well? All NaN values are all the same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tweaked the test name. Is that what you had in mind?

This comment explains why we need exactly the following elements to trigger the 0.0/-0.0 miscount. It doesn't always happen (which is part of what kept this bug hidden for so long).

val spark45599Repro = Seq(
Double.NaN,
2.0,
168.0,
Double.NaN,
Double.NaN,
-0.0,
153.0,
0.0
)

val map1 = new OpenHashMap[Double, Int]()
spark45599Repro.foreach(map1.changeValue(_, 1, {_ + 1}))
assert(map1(0.0) == 1)
assert(map1(-0.0) == 1)
assert(map1(Double.NaN) == 3)

val map2 = new OpenHashMap[Double, Int]()
// Simply changing the order in which the elements are added to the map should not change the
// counts for 0.0 and -0.0.
spark45599Repro.reverse.foreach(map2.changeValue(_, 1, {_ + 1}))
assert(map2(0.0) == 1)
assert(map2(-0.0) == 1)
Comment thread
nchammas marked this conversation as resolved.
assert(map2(Double.NaN) == 3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,43 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers {
assert(pos1 == pos2)
}
}

test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky and it's better if we can find a reference system that defines this semantic. In Spark, 0.0 == -0.0, and in GROUP BY, 0.0 and -0.0 are considered to be in the same group and normalized to 0.0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably make sense if we just fix this particular issue as OpenHashSet is used at many other places.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky and it's better if we can find a reference system that defines this semantic.

scala> import java.util.HashSet
import java.util.HashSet

scala> val h = new HashSet[Double]()
val h: java.util.HashSet[Double] = []

scala> h.add(0.0)
val res0: Boolean = true

scala> h.add(-0.0)
val res1: Boolean = true

scala> h.size()
val res2: Int = 2

The doc for HashSet.add states:

More formally, adds the specified element e to this set if this set contains no element e2 such that Objects.equals(e, e2). If this set already contains the element, the call leaves the set unchanged and returns false.

In other words, java.util.HashSet uses equals and not ==, and therefore it considers 0.0 and -0.0 distinct elements.

So this PR brings OpenHashSet more in line with the semantics of java.util.HashSet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark, 0.0 == -0.0, and in GROUP BY, 0.0 and -0.0 are considered to be in the same group and normalized to 0.0.

This PR does not change this behavior. I noticed, however, that we do not have any tests currently to check that -0.0 is normalized and grouped as you describe, so I went ahead and added such a test in 2bfc605.

Does this address your concern? Or are you suggesting that we should normalize -0.0 to 0.0 across the board?

Copy link
Copy Markdown
Contributor Author

@nchammas nchammas Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider another interesting case where java.util.HashSet and OpenHashSet differ:

scala> val h = new HashSet[Double]()
val h: java.util.HashSet[Double] = []

scala> h.add(Double.NaN)
val res9: Boolean = true

scala> h.add(Double.NaN)
val res10: Boolean = false

scala> h.contains(Double.NaN)
val res11: Boolean = true

scala> h.size()
val res12: Int = 1

On master, OpenHashSet does something obviously wrong:

val set = new OpenHashSet[Double]()
set.add(Double.NaN)
set.add(Double.NaN)
set.size  // returns 2
set.contains(Double.NaN)  // returns false

This could possibly lead to a bug like the one reported in SPARK-45599 but in reverse, where a new NaN row is added rather than dropped. I will see if I can construct such a scenario as a demonstration. But regardless, I think this behavior is incorrect by itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note also that the docstring for OpenHashSet seems to imply that it is meant to be a faster but semantically equivalent alternative to java.util.HashSet:

* storage for four primitive types (Long, Int, Double, and Float). It is much faster than Java's
* standard HashSet while incurring much less memory overhead. This can serve as building blocks

If that's true, then we should perhaps add property based tests to ensure alignment between the two implementations, but I'll leave that as a potential future improvement.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark's OpenHashSet does not have to match java.util.HashSet. What matters is the SQL semantic. Can you highlight which functions/operators are using this OpenHashSet and what is the impact of this change to the SQL semantic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark's OpenHashSet does not have to match java.util.HashSet. What matters is the SQL semantic.

Whether or not OpenHashSet matches java.util.HashSet, I want to emphasize for the record that OpenHashSet mishandles 0.0/-0.0 and NaN. Its behavior is simply incorrect. These tests fail on master in ways that can only be described as bugs, regardless of whatever SQL semantics we want to preserve.

This comment explains the root cause. Basically, it is a mistake to combine hash code-based lookups with cooperative equality, at least in the way we are doing it in OpenHashSet.

But I understand what you are saying. Fixing bugs in OpenHashSet doesn't help us if it also breaks users' SQL.

Can you highlight which functions/operators are using this OpenHashSet

I've updated the PR description with a summary of what uses OpenHashSet.

As a side note, I believe that if we accept the change proposed here, we should be able to eliminate SQLOpenHashSet. SQLOpenHashSet was created specifically to work around the bugs in OpenHashSet that we are addressing in this PR. See #33955 and #33993.

and what is the impact of this change to the SQL semantic?

I've updated the PR description with a diff of what tests pass or fail on master vs. this branch. Please take a look and let me know if you think we need any more tests. I know we are touching a sensitive code path and I appreciate the need for caution.

// Therefore, 0.0 and -0.0 should get separate entries in the hash set.
//
// Exactly these elements provided in roughly this order will trigger the following scenario:
// When probing the bitset in `getPos(-0.0)`, the loop will happen upon the entry for 0.0.
// In the old logic pre-SPARK-45599, the loop will find that the bit is set and, because
// -0.0 == 0.0, it will think that's the position of -0.0. But in reality this is the position
// of 0.0. So -0.0 and 0.0 will be stored at different positions, but `getPos()` will return
// the same position for them. This can cause users of OpenHashSet, like OpenHashMap, to
// return the wrong value for a key based on whether or not this bitset lookup collision
// happens.
Comment thread
nchammas marked this conversation as resolved.
val spark45599Repro = Seq(
Double.NaN,
2.0,
168.0,
Double.NaN,
Double.NaN,
-0.0,
153.0,
0.0
)
val set = new OpenHashSet[Double]()
spark45599Repro.foreach(set.add)
assert(set.size == 6)
val zeroPos = set.getPos(0.0)
val negZeroPos = set.getPos(-0.0)
assert(zeroPos != negZeroPos)
}

test("SPARK-45599: NaN and NaN are the same but not equal") {
// Any mathematical comparison to NaN will return false, but when we place it in
// a hash set we want the lookup to work like a "normal" value.
val set = new OpenHashSet[Double]()
set.add(Double.NaN)
set.add(Double.NaN)
assert(set.contains(Double.NaN))
assert(set.size == 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually waste space in OpenHashSet to store all the NaN values?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sir. On master, this is the actual behavior of OpenHashSet:

// ...OpenHashSet$mcD$sp@21b327e6 did not contain NaN
assert(set.contains(Double.NaN))

// ...OpenHashSet$mcD$sp@1f09db1e had size 2 instead of expected size 1
assert(set.size == 1)

Every NaN will get its own entry in OpenHashSet on master. So if we add 1,000,000 NaNs to the set, NaN will have 1,000,000 entries in there. And .contains() will still return false. :D

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,17 @@ select array_prepend(array(CAST(NULL AS String)), CAST(NULL AS String))
-- !query analysis
Project [array_prepend(array(cast(null as string)), cast(null as string)) AS array_prepend(array(CAST(NULL AS STRING)), CAST(NULL AS STRING))#x]
+- OneRowRelation


-- !query
select array_union(array(0.0, -0.0, DOUBLE("NaN")), array(0.0, -0.0, DOUBLE("NaN")))
-- !query analysis
Project [array_union(array(cast(0.0 as double), cast(0.0 as double), cast(NaN as double)), array(cast(0.0 as double), cast(0.0 as double), cast(NaN as double))) AS array_union(array(0.0, 0.0, NaN), array(0.0, 0.0, NaN))#x]
+- OneRowRelation


-- !query
select array_distinct(array(0.0, -0.0, -0.0, DOUBLE("NaN"), DOUBLE("NaN")))
-- !query analysis
Project [array_distinct(array(cast(0.0 as double), cast(0.0 as double), cast(0.0 as double), cast(NaN as double), cast(NaN as double))) AS array_distinct(array(0.0, 0.0, 0.0, NaN, NaN))#x]
+- OneRowRelation
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,10 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"fragment" : "-x'2379ACFe'"
} ]
}


-- !query
select -0, -0.0
-- !query analysis
Project [0 AS 0#x, 0.0 AS 0.0#x]
+- OneRowRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,17 @@ select array_prepend(array(CAST(NULL AS String)), CAST(NULL AS String))
-- !query analysis
Project [array_prepend(array(cast(null as string)), cast(null as string)) AS array_prepend(array(CAST(NULL AS STRING)), CAST(NULL AS STRING))#x]
+- OneRowRelation


-- !query
select array_union(array(0.0, -0.0, DOUBLE("NaN")), array(0.0, -0.0, DOUBLE("NaN")))
-- !query analysis
Project [array_union(array(cast(0.0 as double), cast(0.0 as double), cast(NaN as double)), array(cast(0.0 as double), cast(0.0 as double), cast(NaN as double))) AS array_union(array(0.0, 0.0, NaN), array(0.0, 0.0, NaN))#x]
+- OneRowRelation


-- !query
select array_distinct(array(0.0, -0.0, -0.0, DOUBLE("NaN"), DOUBLE("NaN")))
-- !query analysis
Project [array_distinct(array(cast(0.0 as double), cast(0.0 as double), cast(0.0 as double), cast(NaN as double), cast(NaN as double))) AS array_distinct(array(0.0, 0.0, 0.0, NaN, NaN))#x]
+- OneRowRelation
Original file line number Diff line number Diff line change
Expand Up @@ -1171,3 +1171,22 @@ Aggregate [c#x], [(c#x * 2) AS d#x]
+- Project [if ((a#x < 0)) 0 else a#x AS b#x]
+- SubqueryAlias t1
+- LocalRelation [a#x]


-- !query
SELECT col1, count(*) AS cnt
FROM VALUES
(0.0),
(-0.0),
(double('NaN')),
(double('NaN')),
(double('Infinity')),
(double('Infinity')),
(-double('Infinity')),
(-double('Infinity'))
GROUP BY col1
ORDER BY col1
-- !query analysis
Sort [col1#x ASC NULLS FIRST], true
+- Aggregate [col1#x], [col1#x, count(1) AS cnt#xL]
+- LocalRelation [col1#x]
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,10 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"fragment" : "-x'2379ACFe'"
} ]
}


-- !query
select -0, -0.0
-- !query analysis
Project [0 AS 0#x, 0.0 AS 0.0#x]
+- OneRowRelation
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/array.sql
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,7 @@ select array_prepend(CAST(null AS ARRAY<String>), CAST(null as String));
select array_prepend(array(), 1);
select array_prepend(CAST(array() AS ARRAY<String>), CAST(NULL AS String));
select array_prepend(array(CAST(NULL AS String)), CAST(NULL AS String));

-- SPARK-45599: Confirm 0.0, -0.0, and NaN are handled appropriately.
select array_union(array(0.0, -0.0, DOUBLE("NaN")), array(0.0, -0.0, DOUBLE("NaN")));
select array_distinct(array(0.0, -0.0, -0.0, DOUBLE("NaN"), DOUBLE("NaN")));
15 changes: 15 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,18 @@ FROM (
GROUP BY b
) t3
GROUP BY c;

-- SPARK-45599: Check that "weird" doubles group and sort as desired.
SELECT col1, count(*) AS cnt
FROM VALUES
(0.0),
(-0.0),
(double('NaN')),
(double('NaN')),
(double('Infinity')),
(double('Infinity')),
(-double('Infinity')),
(-double('Infinity'))
GROUP BY col1
ORDER BY col1
;
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/literals.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@ select +X'1';
select -date '1999-01-01';
select -timestamp '1999-01-01';
select -x'2379ACFe';

-- normalize -0 and -0.0
select -0, -0.0;
16 changes: 16 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/ansi/array.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,19 @@ select array_prepend(array(CAST(NULL AS String)), CAST(NULL AS String))
struct<array_prepend(array(CAST(NULL AS STRING)), CAST(NULL AS STRING)):array<string>>
-- !query output
[null,null]


-- !query
select array_union(array(0.0, -0.0, DOUBLE("NaN")), array(0.0, -0.0, DOUBLE("NaN")))
-- !query schema
struct<array_union(array(0.0, 0.0, NaN), array(0.0, 0.0, NaN)):array<double>>
-- !query output
[0.0,NaN]


-- !query
select array_distinct(array(0.0, -0.0, -0.0, DOUBLE("NaN"), DOUBLE("NaN")))
-- !query schema
struct<array_distinct(array(0.0, 0.0, 0.0, NaN, NaN)):array<double>>
-- !query output
[0.0,NaN]
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,11 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"fragment" : "-x'2379ACFe'"
} ]
}


-- !query
select -0, -0.0
-- !query schema
struct<0:int,0.0:decimal(1,1)>
-- !query output
0 0.0
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/array.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,19 @@ select array_prepend(array(CAST(NULL AS String)), CAST(NULL AS String))
struct<array_prepend(array(CAST(NULL AS STRING)), CAST(NULL AS STRING)):array<string>>
-- !query output
[null,null]


-- !query
select array_union(array(0.0, -0.0, DOUBLE("NaN")), array(0.0, -0.0, DOUBLE("NaN")))
-- !query schema
struct<array_union(array(0.0, 0.0, NaN), array(0.0, 0.0, NaN)):array<double>>
-- !query output
[0.0,NaN]


-- !query
select array_distinct(array(0.0, -0.0, -0.0, DOUBLE("NaN"), DOUBLE("NaN")))
-- !query schema
struct<array_distinct(array(0.0, 0.0, 0.0, NaN, NaN)):array<double>>
-- !query output
[0.0,NaN]
22 changes: 22 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -1102,3 +1102,25 @@ struct<d:int>
-- !query output
0
2


-- !query
SELECT col1, count(*) AS cnt
FROM VALUES
(0.0),
(-0.0),
(double('NaN')),
(double('NaN')),
(double('Infinity')),
(double('Infinity')),
(-double('Infinity')),
(-double('Infinity'))
GROUP BY col1
ORDER BY col1
-- !query schema
struct<col1:double,cnt:bigint>
-- !query output
-Infinity 2
0.0 2
Infinity 2
NaN 2
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,11 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"fragment" : "-x'2379ACFe'"
} ]
}


-- !query
select -0, -0.0
-- !query schema
struct<0:int,0.0:decimal(1,1)>
-- !query output
0 0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,39 @@ class DataFrameAggregateSuite extends QueryTest
)
}

test("SPARK-45599: Neither 0.0 nor -0.0 should be dropped when computing percentile") {
// To reproduce the bug described in SPARK-45599, we need exactly these rows in roughly
// this order in a DataFrame with exactly 1 partition.
// scalastyle:off line.size.limit
// See: https://issues.apache.org/jira/browse/SPARK-45599?focusedCommentId=17806954&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17806954
// scalastyle:on line.size.limit
val spark45599Repro: DataFrame = Seq(
0.0,
2.0,
153.0,
168.0,
3252411229536261.0,
7.205759403792794e+16,
1.7976931348623157e+308,
0.25,
Double.NaN,
Double.NaN,
-0.0,
-128.0,
Double.NaN,
Double.NaN
).toDF("val").coalesce(1)

checkAnswer(
spark45599Repro.agg(
percentile(col("val"), lit(0.1))
),
// With the buggy implementation of OpenHashSet, this returns `0.050000000000000044`
// instead of `-0.0`.
List(Row(-0.0))
)
}

test("any_value") {
checkAnswer(
courseSales.groupBy("course").agg(
Expand Down