Skip to content

Commit

Permalink
Changed awaitTermination not to call awaitTermincation in Scala. Just…
Browse files Browse the repository at this point in the history
… use time.sleep instread
  • Loading branch information
giwa committed Aug 19, 2014
1 parent 09a28bf commit 268a6a5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
4 changes: 3 additions & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import sys
import time
from signal import signal, SIGTERM, SIGINT

from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
Expand Down Expand Up @@ -102,11 +103,12 @@ def start(self):
def awaitTermination(self, timeout=None):
"""
Wait for the execution to stop.
timeout is milliseconds
"""
if timeout is None:
self._jssc.awaitTermination()
else:
self._jssc.awaitTermination(timeout)
time.sleep(timeout/1000)

#TODO: add storageLevel
def socketTextStream(self, hostname, port):
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def tearDown(self):
self.ssc._sc.stop()
# Why does it long time to terminate StremaingContext and SparkContext?
# Should we change the sleep time if this depends on machine spec?
time.sleep(10)
time.sleep(1)

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -436,7 +436,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
# Check time out.
if (current_time - start_time) > self.timeout:
break
self.ssc.awaitTermination(50)
#self.ssc.awaitTermination(50)
time.sleep(0.05)
# Check if the output is the same length of expexted output.
if len(expected_output) == len(result):
break
Expand Down

0 comments on commit 268a6a5

Please sign in to comment.