From a4dc0c0f3395aff3ecf952d70373c75ad6710c2e Mon Sep 17 00:00:00 2001 From: darionyaphet Date: Sun, 26 Jun 2016 18:47:13 +0800 Subject: [PATCH] fix #GEARPUMP-166 Rename sumByValue to sumByKey --- .../src/main/scala/akka/stream/gearpump/scaladsl/Api.scala | 6 +++--- .../scala/org/apache/gearpump/streaming/dsl/Stream.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala index 9e35389d7..9cc46c9a4 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala @@ -243,7 +243,7 @@ object Implicits { * , otherwise, it will do the sum no matter what current key is. */ def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { - val stage = Reduce.apply(sumByValue[K, V](numeric)) + val stage = Reduce.apply(sumByKey[K, V](numeric)) source.via(stage) } } @@ -270,13 +270,13 @@ object Implicits { * */ def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { - val stage = Reduce.apply(sumByValue[K, V](numeric)) + val stage = Reduce.apply(sumByKey[K, V](numeric)) flow.via(stage) } } private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - private def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = + private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) } \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala index 5ca92dd54..786d496dc 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala @@ -172,7 +172,7 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { * @return the sum stream */ def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { - stream.reduce(Stream.sumByValue[K, V](numeric), "sum") + stream.reduce(Stream.sumByKey[K, V](numeric), "sum") } } @@ -184,7 +184,7 @@ object Stream { def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] + def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {