Skip to content

Commit

Permalink
fix scala style
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 1, 2014
1 parent 7797c70 commit bd8a4c2
Showing 1 changed file with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {

def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd)
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava))
.map(_.rdd)
}

def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
Expand Down Expand Up @@ -133,8 +134,9 @@ private[python] object PythonDStream {
/**
* Base class for PythonDStream with some common methods
*/
private[python]
abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransformFunction)
private[python] abstract class PythonDStream(
parent: DStream[_],
@transient pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {

val func = new TransformFunction(pfunc)
Expand All @@ -152,9 +154,10 @@ abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransfo
* If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it
* as an template for future use, this can reduce the Python callbacks.
*/
private[python]
class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTransformFunction,
var reuse: Boolean = false)
private[python] class PythonTransformedDStream (
parent: DStream[_],
@transient pfunc: PythonTransformFunction,
var reuse: Boolean = false)
extends PythonDStream(parent, pfunc) {

// rdd returned by func
Expand Down Expand Up @@ -191,9 +194,10 @@ class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTran
/**
* Transformed from two DStreams in Python.
*/
private[python]
class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
@transient pfunc: PythonTransformFunction)
private[python] class PythonTransformed2DStream(
parent: DStream[_],
parent2: DStream[_],
@transient pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {

val func = new TransformFunction(pfunc)
Expand All @@ -212,8 +216,9 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
/**
* similar to StateDStream
*/
private[python]
class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonTransformFunction)
private[python] class PythonStateDStream(
parent: DStream[Array[Byte]],
@transient reduceFunc: PythonTransformFunction)
extends PythonDStream(parent, reduceFunc) {

super.persist(StorageLevel.MEMORY_ONLY)
Expand All @@ -233,13 +238,13 @@ class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: Py
/**
* similar to ReducedWindowedDStream
*/
private[python]
class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
@transient preduceFunc: PythonTransformFunction,
@transient pinvReduceFunc: PythonTransformFunction,
_windowDuration: Duration,
_slideDuration: Duration
) extends PythonDStream(parent, preduceFunc) {
private[python] class PythonReducedWindowedDStream(
parent: DStream[Array[Byte]],
@transient preduceFunc: PythonTransformFunction,
@transient pinvReduceFunc: PythonTransformFunction,
_windowDuration: Duration,
_slideDuration: Duration)
extends PythonDStream(parent, preduceFunc) {

super.persist(StorageLevel.MEMORY_ONLY)
override val mustCheckpoint = true
Expand All @@ -252,8 +257,7 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val currentTime = validTime
val current = new Interval(currentTime - windowDuration,
currentTime)
val current = new Interval(currentTime - windowDuration, currentTime)
val previous = current - slideDuration

// _____________________________
Expand All @@ -266,11 +270,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
// V V
// old RDDs new RDDs
//

val previousRDD = getOrCompute(previous.endTime)

// for small window, reduce once will be better than twice
if (pinvReduceFunc != null && previousRDD.isDefined
// for small window, reduce once will be better than twice
&& windowDuration >= slideDuration * 5) {

// subtract the values from old RDDs
Expand Down

0 comments on commit bd8a4c2

Please sign in to comment.