Permalink
Browse files

make accumulator.localValue public, add tests

Conflicts:
	core/src/test/scala/spark/AccumulatorSuite.scala
  • Loading branch information...
Imran Rashid
Imran Rashid committed Aug 14, 2012
1 parent d6d071f commit 82a3327862d678500ecebd004c1b5b04862aaad8
Showing with 25 additions and 1 deletion.
  1. +10 −1 core/src/main/scala/spark/Accumulators.scala
  2. +15 −0 core/src/test/scala/spark/AccumulatorSuite.scala
@@ -49,7 +49,16 @@ class Accumulable[R, T] (
else throw new UnsupportedOperationException("Can't read accumulator value in task")
}
private[spark] def localValue = value_
/**
* get the current value of this accumulator from within a task.
*
* This is NOT the global value of the accumulator. To get the global value after a
* completed operation on the dataset, call `value`.
*
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
def localValue = value_
def value_= (r: R) {
if (!deserialized) value_ = r
@@ -112,4 +112,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
}
}
test ("localValue readable in tasks") {
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
d.foreach {
x => acc.localValue += x
}
acc.value should be ( (1 to maxI).toSet)
}
}
}

0 comments on commit 82a3327

Please sign in to comment.