Skip to content

Commit

Permalink
make MIMA happy
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Nov 22, 2014
1 parent 67593d2 commit 138f9b3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
24 changes: 19 additions & 5 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
val allowDuplicate: Boolean = true)
val allowDuplicate: Boolean)
extends Serializable {

def this(@transient initialValue: R, param: AccumulableParam[R, T],
allowDuplicate: Boolean = true) =
this(initialValue, param, None, allowDuplicate)
def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, true)

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None, true)

val id: Long = Accumulators.newId

Expand Down Expand Up @@ -230,8 +232,12 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T],
name: Option[String], allowDuplicate: Boolean = true)
name: Option[String], allowDuplicate: Boolean)
extends Accumulable[T,T](initialValue, param, name, allowDuplicate) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) =
this(initialValue, param, None, true)

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)

def this(initialValue: T, param: AccumulatorParam[T], allowDuplicate: Boolean) =
Expand Down Expand Up @@ -290,6 +296,14 @@ private object Accumulators {
return ret
}

def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
}
}
}

// Add values to the original accumulators with some given IDs
def add(value: (Long, Any)): Unit = synchronized {
if (originals.contains(value._1)) {
Expand Down
31 changes: 28 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,17 @@ class SparkContext(config: SparkConf) extends Logging {
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, allowDuplicate: Boolean = true)
def accumulator[T](initialValue: T)
(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param, None, true)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*
* NOTE: allows user to specify if the accumulator allows duplicate update
*/
def accumulator[T](initialValue: T, name: String, allowDuplicate: Boolean)
(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param, None, allowDuplicate)

Expand All @@ -915,6 +925,7 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param)


/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
Expand All @@ -932,9 +943,23 @@ class SparkContext(config: SparkConf) extends Logging {
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R, allowDuplicate: Boolean = true): Accumulable[R, T] = {
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param, None, true)
}

/**
* Create an accumulator from a "mutable collection" type.
*
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*
* NOTE: this allows user to define if the Accumulable allows duplciate update
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R, allowDuplciate: Boolean): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param, allowDuplicate)
new Accumulable(initialValue, param, None, allowDuplciate)
}

/**
Expand Down

0 comments on commit 138f9b3

Please sign in to comment.