Skip to content

Commit

Permalink
fix sphinx docs
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 2, 2014
1 parent 6bb9d91 commit c7bbbce
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 46 deletions.
2 changes: 1 addition & 1 deletion python/docs/epytext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(r"L{([\w.()]+)}", r":class:`\1`"),
(r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"),
(r"C{([\w.()]+)}", r":class:`\1`"),
(r"[IBCM]{(.+)}", r"`\1`"),
(r"[IBCM]{([^}]+)}", r"`\1`"),
('pyspark.rdd.RDD', 'RDD'),
)

Expand Down
1 change: 1 addition & 0 deletions python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Contents:

pyspark
pyspark.sql
pyspark.streaming
pyspark.mllib


Expand Down
3 changes: 3 additions & 0 deletions python/docs/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
:maxdepth: 4

pyspark
pyspark.sql
pyspark.streaming
pyspark.mllib
3 changes: 2 additions & 1 deletion python/docs/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Subpackages
.. toctree::
:maxdepth: 1

pyspark.mllib
pyspark.sql
pyspark.streaming
pyspark.mllib

Contents
--------
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.dstream import DStream

__all__ = ['StreamingContext', 'DStream']
24 changes: 12 additions & 12 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
represents the connection to a Spark cluster, and can be used to create
L{DStream}s various input sources. It can be from an existing L{SparkContext}.
L{DStream} various input sources. It can be from an existing L{SparkContext}.
After creating and transforming DStreams, the streaming computation can
be started and stopped using `context.start()` and `context.stop()`,
respectively. `context.awaitTransformation()` allows the current thread
Expand Down Expand Up @@ -180,8 +180,8 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
Stop the execution of the streams, with option of ensuring all
received data has been processed.
@param stopSparkContext Stop the associated SparkContext or not
@param stopGracefully Stop gracefully by waiting for the processing
@param stopSparkContext: Stop the associated SparkContext or not
@param stopGracefully: Stop gracefully by waiting for the processing
of all received data to be completed
"""
self._jssc.stop(stopSparkContext, stopGraceFully)
Expand All @@ -197,7 +197,7 @@ def remember(self, duration):
the RDDs (if the developer wishes to query old data outside the
DStream computation).
@param duration Minimum duration (in seconds) that each DStream
@param duration: Minimum duration (in seconds) that each DStream
should remember its RDDs
"""
self._jssc.remember(self._jduration(duration))
Expand All @@ -207,20 +207,20 @@ def checkpoint(self, directory):
Sets the context to periodically checkpoint the DStream operations for master
fault-tolerance. The graph will be checkpointed every batch interval.
@param directory HDFS-compatible directory where the checkpoint data
@param directory: HDFS-compatible directory where the checkpoint data
will be reliably stored
"""
self._jssc.checkpoint(directory)

def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited
a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited
lines.
@param hostname Hostname to connect to for receiving data
@param port Port to connect to for receiving data
@param storageLevel Storage level to use for storing the received objects
@param hostname: Hostname to connect to for receiving data
@param port: Port to connect to for receiving data
@param storageLevel: Storage level to use for storing the received objects
"""
jlevel = self._sc._getJavaStorageLevel(storageLevel)
return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self,
Expand Down Expand Up @@ -249,9 +249,9 @@ def queueStream(self, rdds, oneAtATime=True, default=None):
NOTE: changes to the queue after the stream is created will not be recognized.
@param rdds Queue of RDDs
@param oneAtATime pick one rdd each time or pick all of them once.
@param default The default rdd if no more in rdds
@param rdds: Queue of RDDs
@param oneAtATime: pick one rdd each time or pick all of them once.
@param default: The default rdd if no more in rdds
"""
if default and not isinstance(default, RDD):
default = self._sc.parallelize(default)
Expand Down
65 changes: 33 additions & 32 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def transform(self, func):
on each RDD of 'this' DStream.
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
(`time`, `rdd`)
"""
resue = False
if func.func_code.co_argcount == 1:
Expand Down Expand Up @@ -328,7 +328,8 @@ def _slideDuration(self):
def union(self, other):
"""
Return a new DStream by unifying data of another DStream with this DStream.
@param other Another DStream having the same interval (i.e., slideDuration)
@param other: Another DStream having the same interval (i.e., slideDuration)
as this DStream.
"""
if self._slideDuration != other._slideDuration:
Expand All @@ -348,47 +349,47 @@ def cogroup(self, other, numPartitions=None):

def join(self, other, numPartitions=None):
"""
Return a new DStream by applying 'join' between RDDs of `this` DStream and
Return a new DStream by applying 'join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
partitions.
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
return self.transformWith(lambda a, b: a.join(b, numPartitions), other)

def leftOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
partitions.
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
return self.transformWith(lambda a, b: a.leftOuterJoin(b, numPartitions), other)

def rightOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
partitions.
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other)

def fullOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
partitions.
"""
if numPartitions is None:
numPartitions = self.ctx.defaultParallelism
Expand Down Expand Up @@ -424,9 +425,9 @@ def window(self, windowDuration, slideDuration=None):
Return a new DStream in which each RDD contains all the elements in seen in a
sliding window of time over this DStream.
@param windowDuration width of the window; must be a multiple of this DStream's
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
Expand All @@ -448,13 +449,13 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc associative reduce function
@param invReduceFunc inverse reduce function of `reduceFunc`
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
Expand All @@ -478,12 +479,12 @@ def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=Non
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration width of the window; must be a multiple of this DStream's
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions number of partitions of each RDD in the new DStream.
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
Expand All @@ -495,12 +496,12 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None)
Return a new DStream by applying `groupByKey` over a sliding window.
Similar to `DStream.groupByKey()`, but applies it over a sliding window.
@param windowDuration width of the window; must be a multiple of this DStream's
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions Number of partitions of each RDD in the new DStream.
@param numPartitions: Number of partitions of each RDD in the new DStream.
"""
ls = self.mapValues(lambda x: [x])
grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):],
Expand All @@ -519,15 +520,15 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
than having `invFunc`.
@param reduceFunc associative reduce function
@param invReduceFunc inverse function of `reduceFunc`
@param windowDuration width of the window; must be a multiple of this DStream's
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions number of partitions of each RDD in the new DStream.
@param filterFunc function to filter expired key-value pairs;
@param numPartitions: number of partitions of each RDD in the new DStream.
@param filterFunc: function to filter expired key-value pairs;
only pairs that satisfy the function are retained
set this to null if you do not want to filter
"""
Expand Down Expand Up @@ -567,7 +568,7 @@ def updateStateByKey(self, updateFunc, numPartitions=None):
Return a new "state" DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
@param updateFunc State update function ([(k, vs, s)] -> [(k, s)]).
@param updateFunc: State update function ([(k, vs, s)] -> [(k, s)]).
If `s` is None, then `k` will be eliminated.
"""
if numPartitions is None:
Expand Down

0 comments on commit c7bbbce

Please sign in to comment.