diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index ef9b87756fcef..ec45acec94dbf 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -50,8 +50,8 @@ def setUp(self): self.ssc = StreamingContext(appName=class_name, duration=Seconds(1)) def tearDown(self): - # Do not call StreamingContext.stop directly because we do not wait to shutdown - # call back server and py4j client + # Do not call pyspark.streaming.context.StreamingContext.stop directly because + # we do not wait to shutdowncall back server and py4j client self.ssc._jssc.stop() self.ssc._sc.stop() # Why does it long time to terminaete StremaingContext and SparkContext? @@ -146,7 +146,7 @@ def _run_stream(self, test_input, test_func, expected_output): """Start stream and return the output""" # Generate input stream with user-defined input test_input_stream = self.ssc._testInputStream(test_input) - # Applied test function to stream + # Apply test function to stream test_stream = test_func(test_input_stream) # Add job to get output from stream test_stream._test_output(StreamOutput.result) @@ -160,6 +160,7 @@ def _run_stream(self, test_input, test_func, expected_output): if (current_time - start_time) > self.timeout: break self.ssc.awaitTermination(50) + # check if the output is the same length of expexted output if len(expected_output) == len(StreamOutput.result): break return StreamOutput.result