Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 30, 2014
1 parent 069a94c commit e00136b
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private[spark] class PythonRDD(
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {

// create a new PythonRDD with same Python setting but different parent.
def copyTo(rdd: RDD[_]): PythonRDD = {
new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning,
pythonExec, broadcastVars, accumulator)
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import platform
from subprocess import Popen, PIPE
from threading import Thread

from py4j.java_gateway import java_import, JavaGateway, GatewayClient


Expand Down
32 changes: 21 additions & 11 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
def _daemonize_callback_server():
"""
Hack Py4J to daemonize callback server
The thread of callback server has daemon=False, it will block the driver
from exiting if it's not shutdown. The following code replace `start()`
of CallbackServer with a new version, which set daemon=True for this
thread.
"""
# TODO: create a patch for Py4J
import socket
Expand All @@ -47,7 +52,6 @@ def start(self):
1)
try:
self.server_socket.bind((self.address, self.port))
# self.port = self.server_socket.getsockname()[1]
except Exception:
msg = 'An error occurred while trying to start the callback server'
logger.exception(msg)
Expand All @@ -63,19 +67,21 @@ def start(self):

class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{DStream}s and
broadcast variables on that cluster.
Main entry point for Spark Streaming functionality. A StreamingContext
represents the connection to a Spark cluster, and can be used to create
L{DStream}s various input sources. It can be from an existing L{SparkContext}.
After creating and transforming DStreams, the streaming computation can
be started and stopped using `context.start()` and `context.stop()`,
respectively. `context.awaitTransformation()` allows the current thread
to wait for the termination of the context by `stop()` or by an exception.
"""

def __init__(self, sparkContext, duration):
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
Create a new StreamingContext.
@param sparkContext: L{SparkContext} object.
@param duration: seconds for SparkStreaming.
@param duration: number of seconds.
"""
self._sc = sparkContext
self._jvm = self._sc._jvm
Expand Down Expand Up @@ -127,8 +133,12 @@ def awaitTermination(self, timeout=None):

def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
Stop the execution of the streams, with option of ensuring all
received data has been processed.
@param stopSparkContext Stop the associated SparkContext or not
@param stopGracefully Stop gracefully by waiting for the processing
of all received data to be completed
"""
self._jssc.stop(stopSparkContext, stopGraceFully)
if stopSparkContext:
Expand All @@ -140,7 +150,7 @@ def remember(self, duration):
in the last given duration. DStreams remember RDDs only for a
limited duration of time and releases them for garbage collection.
This method allows the developer to specify how to long to remember
the RDDs ( if the developer wishes to query old data outside the
the RDDs (if the developer wishes to query old data outside the
DStream computation).
@param duration Minimum duration (in seconds) that each DStream
Expand Down
Loading

0 comments on commit e00136b

Please sign in to comment.