Skip to content

Commit

Permalink
make MIMA happy
Browse files Browse the repository at this point in the history
  • Loading branch information
Nan Zhu authored and Nan Zhu committed Sep 25, 2014
1 parent af3ba6c commit af7ff02
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 11 deletions.
24 changes: 19 additions & 5 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
val allowDuplicate: Boolean = true)
val allowDuplicate: Boolean)
extends Serializable {

def this(@transient initialValue: R, param: AccumulableParam[R, T],
allowDuplicate: Boolean = true) =
this(initialValue, param, None, allowDuplicate)
def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, true)

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None, true)

val id: Long = Accumulators.newId

Expand Down Expand Up @@ -229,8 +231,12 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T],
name: Option[String], allowDuplicate: Boolean = true)
name: Option[String], allowDuplicate: Boolean)
extends Accumulable[T,T](initialValue, param, name, allowDuplicate) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) =
this(initialValue, param, None, true)

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)

def this(initialValue: T, param: AccumulatorParam[T], allowDuplicate: Boolean) =
Expand Down Expand Up @@ -289,6 +295,14 @@ private object Accumulators {
return ret
}

def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
}
}
}

// Add values to the original accumulators with some given IDs
def add(value: (Long, Any)): Unit = synchronized {
if (originals.contains(value._1)) {
Expand Down
61 changes: 55 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,17 @@ class SparkContext(config: SparkConf) extends Logging {
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, allowDuplicate: Boolean = true)
def accumulator[T](initialValue: T)
(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param, None, true)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*
* NOTE: allows user to specify if the accumulator allows duplicate update
*/
def accumulator[T](initialValue: T, name: String, allowDuplicate: Boolean)
(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param, None, allowDuplicate)

Expand All @@ -763,9 +773,21 @@ class SparkContext(config: SparkConf) extends Logging {
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, allowDuplicate: Boolean = true)
def accumulable[T, R](initialValue: T)
(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, allowDuplicate)
new Accumulable(initialValue, param, None, true)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*
* NOTE: this allows user to specify where the Accumulable allows duplicate update
*/
def accumulable[T, R](initialValue: T, allowDuplicate: Boolean)
(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, None, allowDuplicate)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
Expand All @@ -774,7 +796,20 @@ class SparkContext(config: SparkConf) extends Logging {
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String, allowDuplicate: Boolean = true)
def accumulable[T, R](initialValue: T, name: String)
(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name), true)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*
* NOTE: this allows user to specify where the Accumulable allows duplicate update
*/
def accumulable[T, R](initialValue: T, name: String, allowDuplicate: Boolean)
(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name), allowDuplicate)

Expand All @@ -785,9 +820,23 @@ class SparkContext(config: SparkConf) extends Logging {
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R, allowDuplicate: Boolean = true): Accumulable[R, T] = {
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param, None, true)
}

/**
* Create an accumulator from a "mutable collection" type.
*
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*
* NOTE: this allows user to define if the Accumulable allows duplciate update
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R, allowDuplciate: Boolean): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param, allowDuplicate)
new Accumulable(initialValue, param, None, allowDuplciate)
}

/**
Expand Down

0 comments on commit af7ff02

Please sign in to comment.