Skip to content

Commit

Permalink
edited the comment to add more precise description
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 11, 2014
1 parent bdde697 commit a65f302
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def setUp(self):
self.ssc = StreamingContext(appName=class_name, duration=Seconds(1))

def tearDown(self):
# Do not call StreamingContext.stop directly because we do not wait to shutdown
# call back server and py4j client
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
# we do not wait to shutdowncall back server and py4j client
self.ssc._jssc.stop()
self.ssc._sc.stop()
# Why does it long time to terminaete StremaingContext and SparkContext?
Expand Down Expand Up @@ -146,7 +146,7 @@ def _run_stream(self, test_input, test_func, expected_output):
"""Start stream and return the output"""
# Generate input stream with user-defined input
test_input_stream = self.ssc._testInputStream(test_input)
# Applied test function to stream
# Apply test function to stream
test_stream = test_func(test_input_stream)
# Add job to get output from stream
test_stream._test_output(StreamOutput.result)
Expand All @@ -160,6 +160,7 @@ def _run_stream(self, test_input, test_func, expected_output):
if (current_time - start_time) > self.timeout:
break
self.ssc.awaitTermination(50)
# check if the output is the same length of expexted output
if len(expected_output) == len(StreamOutput.result):
break
return StreamOutput.result
Expand Down

0 comments on commit a65f302

Please sign in to comment.