Permalink
Browse files

fixed the optimisation of combine nodes to parDos

  • Loading branch information...
1 parent 21f2352 commit ee1435b3d904294ffad67188d0fe721a0f799da7 @etorreborre etorreborre committed Sep 17, 2012
@@ -488,8 +488,8 @@ object Smart {
def setup(env: Unit) = {}
def process(env: Unit, input: (K, Iterable[V]), emitter: Emitter[(K, V)]) = {
val key = input._1
- val values = input._2
- emitter.emit(key, values.tail.foldLeft(values.head)(f))
+ val values = input._2.toStream
+ emitter.emit(key, values.reduce(f))
}
def cleanup(env: Unit, emitter: Emitter[(K, V)]) = {}
}
@@ -24,9 +24,10 @@ class WordCountSpec extends NictaSimpleJobs {
"Counting words frequencies must return the frequency for each word" >> { implicit sc: SC =>
val frequencies =
- DList(repeat("hello" -> 3, "world" -> 4):_*).
+ DList(repeat("hello" -> 3, "world" -> 4, "universe" -> 2):_*).
flatMap(_.split(" ")).map((_, 1)).
groupByKey.
+ filter { case (word, n) => word.length < 6 }.
combine((i: Int, j: Int) => i + j)
frequencies.run.sorted must_== Seq(("hello", 3), ("world", 4))

0 comments on commit ee1435b

Please sign in to comment.