diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8a9e2dab7fb07..ffcf70cc854ab 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -354,6 +354,15 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None) def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration, numPartitions=None): + + duration = self._jdstream.dstream().slideDuration().milliseconds() + if int(windowDuration * 1000) % duration != 0: + raise ValueError("windowDuration must be multiple of the slide duration (%d ms)" + % duration) + if int(slideDuration * 1000) % duration != 0: + raise ValueError("slideDuration must be multiple of the slide duration (%d ms)" + % duration) + reduced = self.reduceByKey(func) def reduceFunc(a, b, t): diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 06fcc29850504..843d6ee04ca33 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -292,6 +292,12 @@ def func(dstream): [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]] self._test_func(input, func, expected) + def test_reduce_by_invalid_window(self): + input1 = [range(3), range(5), range(1), range(6)] + d1 = self.ssc.queueStream(input1) + self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1)) + self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1)) + def update_state_by_key(self): def updater(it): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 66cf0c968478c..47c3974b61699 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -92,7 +92,8 @@ private[spark] object PythonDStream { * If the result RDD is PythonRDD, then it will cache it as an template for future use, * this can reduce the Python callbacks. */ -private[spark] class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, +private[spark] +class PythonTransformedDStream (parent: DStream[_], pfunc: PythonRDDFunction, var reuse: Boolean = false) extends PythonDStream(parent) { @@ -180,16 +181,6 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], _slideDuration: Duration ) extends PythonStateDStream(parent, preduceFunc) { - assert(_windowDuration.isMultipleOf(parent.slideDuration), - "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" - ) - - assert(_slideDuration.isMultipleOf(parent.slideDuration), - "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" - ) - val invReduceFunc = new RDDFunction(pinvReduceFunc) def windowDuration: Duration = _windowDuration