From bd8a4c2516147f1e99cf1f6e721346c18db23a20 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 1 Oct 2014 09:26:26 -0700 Subject: [PATCH] fix scala style --- .../streaming/api/python/PythonDStream.scala | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 5ab15f717903e..5afcb84857350 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -47,7 +47,8 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable { def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { - Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd) + Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)) + .map(_.rdd) } def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = { @@ -133,8 +134,9 @@ private[python] object PythonDStream { /** * Base class for PythonDStream with some common methods */ -private[python] -abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransformFunction) +private[python] abstract class PythonDStream( + parent: DStream[_], + @transient pfunc: PythonTransformFunction) extends DStream[Array[Byte]] (parent.ssc) { val func = new TransformFunction(pfunc) @@ -152,9 +154,10 @@ abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransfo * If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it * as an template for future use, this can reduce the Python callbacks. */ -private[python] -class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTransformFunction, - var reuse: Boolean = false) +private[python] class PythonTransformedDStream ( + parent: DStream[_], + @transient pfunc: PythonTransformFunction, + var reuse: Boolean = false) extends PythonDStream(parent, pfunc) { // rdd returned by func @@ -191,9 +194,10 @@ class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTran /** * Transformed from two DStreams in Python. */ -private[python] -class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_], - @transient pfunc: PythonTransformFunction) +private[python] class PythonTransformed2DStream( + parent: DStream[_], + parent2: DStream[_], + @transient pfunc: PythonTransformFunction) extends DStream[Array[Byte]] (parent.ssc) { val func = new TransformFunction(pfunc) @@ -212,8 +216,9 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_], /** * similar to StateDStream */ -private[python] -class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonTransformFunction) +private[python] class PythonStateDStream( + parent: DStream[Array[Byte]], + @transient reduceFunc: PythonTransformFunction) extends PythonDStream(parent, reduceFunc) { super.persist(StorageLevel.MEMORY_ONLY) @@ -233,13 +238,13 @@ class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: Py /** * similar to ReducedWindowedDStream */ -private[python] -class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], - @transient preduceFunc: PythonTransformFunction, - @transient pinvReduceFunc: PythonTransformFunction, - _windowDuration: Duration, - _slideDuration: Duration - ) extends PythonDStream(parent, preduceFunc) { +private[python] class PythonReducedWindowedDStream( + parent: DStream[Array[Byte]], + @transient preduceFunc: PythonTransformFunction, + @transient pinvReduceFunc: PythonTransformFunction, + _windowDuration: Duration, + _slideDuration: Duration) + extends PythonDStream(parent, preduceFunc) { super.persist(StorageLevel.MEMORY_ONLY) override val mustCheckpoint = true @@ -252,8 +257,7 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { val currentTime = validTime - val current = new Interval(currentTime - windowDuration, - currentTime) + val current = new Interval(currentTime - windowDuration, currentTime) val previous = current - slideDuration // _____________________________ @@ -266,11 +270,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], // V V // old RDDs new RDDs // - val previousRDD = getOrCompute(previous.endTime) + // for small window, reduce once will be better than twice if (pinvReduceFunc != null && previousRDD.isDefined - // for small window, reduce once will be better than twice && windowDuration >= slideDuration * 5) { // subtract the values from old RDDs