Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deprecated AccumulatorParam with AccumulatorV2 #149

Merged
merged 1 commit into from
Feb 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.bdgenomics.utils.interval.array

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.apache.spark.AccumulatorParam
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.annotation.tailrec
import scala.math.max
import scala.reflect.ClassTag
Expand All @@ -30,15 +30,34 @@ import scala.reflect.ClassTag
*/
object IntervalArray extends Serializable {

private class MaxAccumulatorParam extends AccumulatorParam[Long] {
private class MaxLongAccumulator extends AccumulatorV2[Long, Long] {
private var _max = 0L

def zero(initialValue: Long): Long = {
initialValue
override def isZero: Boolean = _max == 0L

override def copy(): AccumulatorV2[Long, Long] = {
val newAcc = new MaxLongAccumulator
newAcc._max = _max
newAcc
}

override def reset(): Unit = {
_max = 0L
}

override def add(v: Long): Unit = {
_max = max(_max, v)
}

def addInPlace(r1: Long, r2: Long): Long = {
max(r1, r2)
override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: MaxLongAccumulator =>
_max = max(_max, o._max)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Long = _max
}

/**
Expand All @@ -52,16 +71,13 @@ object IntervalArray extends Serializable {
rdd: RDD[(K, T)],
buildFn: (Array[(K, T)], Long) => IntervalArray[K, T]): IntervalArray[K, T] = {

val accum = {
implicit val accumParam = new MaxAccumulatorParam

rdd.context.accumulator(0L)
}
val accum = new MaxLongAccumulator
rdd.context.register(accum, "max")

val sortedArray =
rdd.map(i => {
// we do this to get the width of the widest interval
accum += i._1.width
accum.add(i._1.width)

i
}).sortByKey()
Expand Down