Skip to content

Commit

Permalink
move check of window into Python
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 28, 2014
1 parent fce0ef5 commit e059ca2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
9 changes: 9 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None)

def reduceByKeyAndWindow(self, func, invFunc,
windowDuration, slideDuration, numPartitions=None):

duration = self._jdstream.dstream().slideDuration().milliseconds()
if int(windowDuration * 1000) % duration != 0:
raise ValueError("windowDuration must be multiple of the slide duration (%d ms)"
% duration)
if int(slideDuration * 1000) % duration != 0:
raise ValueError("slideDuration must be multiple of the slide duration (%d ms)"
% duration)

reduced = self.reduceByKey(func)

def reduceFunc(a, b, t):
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ def func(dstream):
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
self._test_func(input, func, expected)

def test_reduce_by_invalid_window(self):
input1 = [range(3), range(5), range(1), range(6)]
d1 = self.ssc.queueStream(input1)
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))

def update_state_by_key(self):

def updater(it):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private[spark] object PythonDStream {
* If the result RDD is PythonRDD, then it will cache it as an template for future use,
* this can reduce the Python callbacks.
*/
private[spark] class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,
private[spark]
class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction,
var reuse: Boolean = false)
extends PythonDStream(parent) {

Expand Down Expand Up @@ -180,16 +181,6 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
_slideDuration: Duration
) extends PythonStateDStream(parent, preduceFunc) {

assert(_windowDuration.isMultipleOf(parent.slideDuration),
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)

assert(_slideDuration.isMultipleOf(parent.slideDuration),
"The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)

val invReduceFunc = new RDDFunction(pinvReduceFunc)

def windowDuration: Duration = _windowDuration
Expand Down

0 comments on commit e059ca2

Please sign in to comment.