diff --git a/python/docs/epytext.py b/python/docs/epytext.py index 61d731bff570d..19fefbfc057a4 100644 --- a/python/docs/epytext.py +++ b/python/docs/epytext.py @@ -5,7 +5,7 @@ (r"L{([\w.()]+)}", r":class:`\1`"), (r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"), (r"C{([\w.()]+)}", r":class:`\1`"), - (r"[IBCM]{(.+)}", r"`\1`"), + (r"[IBCM]{([^}]+)}", r"`\1`"), ('pyspark.rdd.RDD', 'RDD'), ) diff --git a/python/docs/index.rst b/python/docs/index.rst index 25b3f9bd93e63..e0f4e5c192acf 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -13,6 +13,7 @@ Contents: pyspark pyspark.sql + pyspark.streaming pyspark.mllib diff --git a/python/docs/modules.rst b/python/docs/modules.rst index 183564659fbcf..04dce62be5f49 100644 --- a/python/docs/modules.rst +++ b/python/docs/modules.rst @@ -5,3 +5,6 @@ :maxdepth: 4 pyspark + pyspark.sql + pyspark.streaming + pyspark.mllib diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst index a68bd62433085..e81be3b6cb796 100644 --- a/python/docs/pyspark.rst +++ b/python/docs/pyspark.rst @@ -7,8 +7,9 @@ Subpackages .. toctree:: :maxdepth: 1 - pyspark.mllib pyspark.sql + pyspark.streaming + pyspark.mllib Contents -------- diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index 00d2823525992..d2644a1d4ffab 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -17,3 +17,5 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream + +__all__ = ['StreamingContext', 'DStream'] diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index b84e12ebac1dc..aabbbd958080a 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -71,7 +71,7 @@ class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create - L{DStream}s various input sources. It can be from an existing L{SparkContext}. + L{DStream} various input sources. It can be from an existing L{SparkContext}. After creating and transforming DStreams, the streaming computation can be started and stopped using `context.start()` and `context.stop()`, respectively. `context.awaitTransformation()` allows the current thread @@ -180,8 +180,8 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): Stop the execution of the streams, with option of ensuring all received data has been processed. - @param stopSparkContext Stop the associated SparkContext or not - @param stopGracefully Stop gracefully by waiting for the processing + @param stopSparkContext: Stop the associated SparkContext or not + @param stopGracefully: Stop gracefully by waiting for the processing of all received data to be completed """ self._jssc.stop(stopSparkContext, stopGraceFully) @@ -197,7 +197,7 @@ def remember(self, duration): the RDDs (if the developer wishes to query old data outside the DStream computation). - @param duration Minimum duration (in seconds) that each DStream + @param duration: Minimum duration (in seconds) that each DStream should remember its RDDs """ self._jssc.remember(self._jduration(duration)) @@ -207,7 +207,7 @@ def checkpoint(self, directory): Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval. - @param directory HDFS-compatible directory where the checkpoint data + @param directory: HDFS-compatible directory where the checkpoint data will be reliably stored """ self._jssc.checkpoint(directory) @@ -215,12 +215,12 @@ def checkpoint(self, directory): def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): """ Create an input from TCP source hostname:port. Data is received using - a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited + a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited lines. - @param hostname Hostname to connect to for receiving data - @param port Port to connect to for receiving data - @param storageLevel Storage level to use for storing the received objects + @param hostname: Hostname to connect to for receiving data + @param port: Port to connect to for receiving data + @param storageLevel: Storage level to use for storing the received objects """ jlevel = self._sc._getJavaStorageLevel(storageLevel) return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, @@ -249,9 +249,9 @@ def queueStream(self, rdds, oneAtATime=True, default=None): NOTE: changes to the queue after the stream is created will not be recognized. - @param rdds Queue of RDDs - @param oneAtATime pick one rdd each time or pick all of them once. - @param default The default rdd if no more in rdds + @param rdds: Queue of RDDs + @param oneAtATime: pick one rdd each time or pick all of them once. + @param default: The default rdd if no more in rdds """ if default and not isinstance(default, RDD): default = self._sc.parallelize(default) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index f8ebb7e68d8d7..a77e8f505e147 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -284,7 +284,7 @@ def transform(self, func): on each RDD of 'this' DStream. `func` can have one argument of `rdd`, or have two arguments of - (`time`, `rdd`) + (`time`, `rdd`) """ resue = False if func.func_code.co_argcount == 1: @@ -328,7 +328,8 @@ def _slideDuration(self): def union(self, other): """ Return a new DStream by unifying data of another DStream with this DStream. - @param other Another DStream having the same interval (i.e., slideDuration) + + @param other: Another DStream having the same interval (i.e., slideDuration) as this DStream. """ if self._slideDuration != other._slideDuration: @@ -348,11 +349,11 @@ def cogroup(self, other, numPartitions=None): def join(self, other, numPartitions=None): """ - Return a new DStream by applying 'join' between RDDs of `this` DStream and + Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. + partitions. """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism @@ -360,11 +361,11 @@ def join(self, other, numPartitions=None): def leftOuterJoin(self, other, numPartitions=None): """ - Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. + partitions. """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism @@ -372,11 +373,11 @@ def leftOuterJoin(self, other, numPartitions=None): def rightOuterJoin(self, other, numPartitions=None): """ - Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. + partitions. """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism @@ -384,11 +385,11 @@ def rightOuterJoin(self, other, numPartitions=None): def fullOuterJoin(self, other, numPartitions=None): """ - Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. + partitions. """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism @@ -424,9 +425,9 @@ def window(self, windowDuration, slideDuration=None): Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. - @param windowDuration width of the window; must be a multiple of this DStream's + @param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration sliding interval of the window (i.e., the interval after which + @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ @@ -448,13 +449,13 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. - @param reduceFunc associative reduce function - @param invReduceFunc inverse reduce function of `reduceFunc` - @param windowDuration width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval + @param reduceFunc: associative reduce function + @param invReduceFunc: inverse reduce function of `reduceFunc` + @param windowDuration: width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration: sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval """ keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, @@ -478,12 +479,12 @@ def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=Non Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. - @param windowDuration width of the window; must be a multiple of this DStream's + @param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration sliding interval of the window (i.e., the interval after which + @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions number of partitions of each RDD in the new DStream. + @param numPartitions: number of partitions of each RDD in the new DStream. """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, @@ -495,12 +496,12 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None) Return a new DStream by applying `groupByKey` over a sliding window. Similar to `DStream.groupByKey()`, but applies it over a sliding window. - @param windowDuration width of the window; must be a multiple of this DStream's + @param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration sliding interval of the window (i.e., the interval after which + @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions Number of partitions of each RDD in the new DStream. + @param numPartitions: Number of partitions of each RDD in the new DStream. """ ls = self.mapValues(lambda x: [x]) grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], @@ -519,15 +520,15 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. - @param reduceFunc associative reduce function - @param invReduceFunc inverse function of `reduceFunc` - @param windowDuration width of the window; must be a multiple of this DStream's + @param reduceFunc: associative reduce function + @param invReduceFunc: inverse function of `reduceFunc` + @param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration sliding interval of the window (i.e., the interval after which + @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions number of partitions of each RDD in the new DStream. - @param filterFunc function to filter expired key-value pairs; + @param numPartitions: number of partitions of each RDD in the new DStream. + @param filterFunc: function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter """ @@ -567,7 +568,7 @@ def updateStateByKey(self, updateFunc, numPartitions=None): Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. - @param updateFunc State update function ([(k, vs, s)] -> [(k, s)]). + @param updateFunc: State update function ([(k, vs, s)] -> [(k, s)]). If `s` is None, then `k` will be eliminated. """ if numPartitions is None: