Skip to content

Commit

Permalink
Make the Merge Set accumulator thread safe
Browse files Browse the repository at this point in the history
- Merge uses an accumulator to get the list of matching AddFiles that is not thread safe and can lead to tasks failing in very large merges.
- This PR aims to fix this issue using` java.util.Collections.synchronizedSet`

GitOrigin-RevId: b4bf64ff3e39aab239e50c81c7b217da72fb78e6
  • Loading branch information
rahulsmahadev authored and scottsand-db committed Apr 26, 2022
1 parent a31d0bc commit e5e8b84
Showing 1 changed file with 2 additions and 2 deletions.
Expand Up @@ -22,10 +22,10 @@ import org.apache.spark.util.AccumulatorV2
* Accumulator to collect distinct elements as a set.
*/
class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
private var _set: java.util.HashSet[T] = _
private var _set: java.util.Set[T] = _

private def getOrCreate = {
_set = Option(_set).getOrElse(new java.util.HashSet[T]())
_set = Option(_set).getOrElse(java.util.Collections.synchronizedSet(new java.util.HashSet[T]()))
_set
}

Expand Down

0 comments on commit e5e8b84

Please sign in to comment.