Skip to content

Commit

Permalink
Merge pull request #46 from markheger/master
Browse files Browse the repository at this point in the history
0.9.3 - documentation update
  • Loading branch information
markheger committed Feb 14, 2020
2 parents 20b2a53 + a82d164 commit 4f7e327
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 20 deletions.
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<fileset dir="${basedir}" includes="**/tk*/**"/>
</delete>
<delete includeemptydirs="true">
<fileset dir="${package}" includes="**/build/**,dist/**,streamsx.*.egg-info/**,job*.tar.gz"/>
<fileset dir="${package}" includes="**/build/**,**/generated/**,dist/**,streamsx.*.egg-info/**,job*.tar.gz"/>
</delete>
<delete includeemptydirs="true">
<fileset dir="${basedir}" includes="**/nose_runs/**"/>
Expand Down
7 changes: 6 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'sphinx.ext.autodoc',
'sphinx.ext.autosummary',
'sphinx.ext.napoleon',
'sphinx.ext.intersphinx',
]

autodoc_default_flags = ['members', 'show-inheritance']
Expand Down Expand Up @@ -67,7 +68,7 @@
# The short X.Y version.
version = '0.9'
# The full version, including alpha/beta/rc tags.
release = '0.9.2'
release = '0.9.3'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down Expand Up @@ -343,3 +344,7 @@
# If true, do not generate a @detailmenu in the "Top" node's menu.
#
# texinfo_no_detailmenu = False

# -- Interspinx references -------------------------------------------

intersphinx_mapping = {'topology_ref': ('https://streamsxtopology.readthedocs.io/en/stable/', None)}
2 changes: 1 addition & 1 deletion streamsx/standard/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__='0.9.2'
__version__='0.9.3'
79 changes: 62 additions & 17 deletions streamsx/standard/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,25 @@ def spray(stream, count, queue=1000, name=None):
then processing of the input stream is blocked until there
is space in the queue.
Example, spray the source tuples to 8 streams::
from streamsx.topology.topology import Topology
import streamsx.standard.utility as U
topo = Topology()
s = topo.source(U.Sequence())
outs = []
for so in U.spray(s, count=8):
outs.append(so.map(lambda x : (x['seq'], x['ts']), schema=U.SEQUENCE_SCHEMA))
s = outs[0].union(set(outs))
Args:
count(int): Number of output streams the input stream will be sprayed across.
queue(int): Maximum queue size.
name(str): Name of the stream, if `None` a generated name is used.
Returns:
list(Stream) : List of output streams.
list(:py:class:`topology_ref:streamsx.topology.topology.Stream`) : List of output streams.
"""
_op = _ThreadedSplit(stream, count, queue,name=name)
return _op.outputs
Expand Down Expand Up @@ -185,18 +197,30 @@ def union(inputs, schema, name=None):
the output schemas and the input schemas may contain additional
attributes which will be discarded.
Example, the output of :py:meth:`~streamsx.standard.utility.union` contains attribute ``c`` only::
from streamsx.topology.topology import Topology
import streamsx.standard.utility as U
topo = Topology()
...
# schema of stream a: 'tuple<int32 a, int32 c>'
# schema of stream b: 'tuple<int32 c, int32 b>'
r = U.union([a,b], schema='tuple<int32 c>')
.. note::
This method differs from ``Stream.union`` in that
This method differs from :py:meth:`topology_ref:streamsx.topology.topology.Stream.union` in that
the schemas of input and output streams can differ, while
``Stream.union`` requires matching input and output schemas.
:py:meth:`~streamsx.standard.utility.union` requires matching input and output attributes.
Args:
inputs(list[Stream]): Streams to be unioned.
schema(StreamSchema): Schema of output stream
inputs(list[:py:class:`topology_ref:streamsx.topology.topology.Stream`]): Streams to be unioned.
schema(:py:class:`topology_ref:streamsx.topology.schema.StreamSchema`): Schema of output stream
name(str): Name of the stream, if `None` a generated name is used.
Returns:
Stream: Stream that is a union of `inputs`.
:py:class:`topology_ref:streamsx.topology.topology.Stream`: Stream that is a union of `inputs`.
"""
_op = _Union(inputs, schema, name=name)
Expand Down Expand Up @@ -226,6 +250,15 @@ class Deduplicate(streamsx.topology.composite.Map):
Args:
count(int): Number of tuples.
period(float): Time period to check for duplicates.
Example discarding duplicate tuples wth `a=1` and `a=2`::
import streamsx.standard.utility as U
topo = Topology()
s = topo.source([1,2,1,4,5,2,6,3,7,8,9])
s = s.map(lambda v : {'a':v}, schema='tuple<int32 a>')
s = s.map(U.Deduplicate(count=10))
"""
def __init__(self, count:int=None, period:float=None):
self.count = count
Expand Down Expand Up @@ -302,7 +335,7 @@ def pair(stream0, stream1, matching=None, name=None):
"""Pair tuples across two streams.
This method is used to merge results from performing
parallel tasks on the same stream, for example peform multiple
parallel tasks on the same stream, for example perform multiple
model scoring on the same stream.
Holds tuples on the two input streams until a matched tuple has been
Expand Down Expand Up @@ -347,21 +380,21 @@ def pair(stream0, stream1, matching=None, name=None):
cust_churn_renew = U.pair(cust_churn, cust_renew, matching='id');
Args:
stream0(Stream): First input stream.
stream1(Stream): Second input stream.
stream0(:py:class:`topology_ref:streamsx.topology.topology.Stream`): First input stream.
stream1(:py:class:`topology_ref:streamsx.topology.topology.Stream`): Second input stream.
matching(str): Attribute name for matching tuples.
name(str): Name of resultant stream, defaults to a generated name.
Returns:
Stream: Paired stream.
:py:class:`topology_ref:streamsx.topology.topology.Stream`: Paired stream.
"""
return merge([stream0, stream1], matching, name)

def merge(inputs, matching=None, name=None):
"""Merge tuples across two (or more) streams.
This method is used to merge results from performing
parallel tasks on the same stream, for example peform multiple
parallel tasks on the same stream, for example perform multiple
model scoring on the same stream.
Holds tuples on the input streams until a matched tuple has been
Expand All @@ -386,12 +419,12 @@ def merge(inputs, matching=None, name=None):
* ``CommonSchema.Json``
Args:
inputs(list[Stream]): Input streams to be matched.
inputs(list[:py:class:`topology_ref:streamsx.topology.topology.Stream`]): Input streams to be matched.
matching(str): Attribute name for matching.
name(str): Name of resultant stream, defaults to a generated name.
Returns:
Stream: Merged stream.
:py:class:`topology_ref:streamsx.topology.topology.Stream`: Merged stream.
"""
_op = _Pair(inputs, matching, name=name)
return _op.outputs[0]
Expand Down Expand Up @@ -421,17 +454,29 @@ def gate(stream, control, max_unacked=1, ack_count=1, name=None):
tuples reaches `max_unacked` again.
The output of some downstream processing is typically used as `control`
and thus `control` is usually a stream obtained from a `streamsx.topology.topology.PendingStream`.
and thus `control` is usually a stream obtained from a :py:class:`topology_ref:streamsx.topology.topology.PendingStream`.
Example with feedback loop::
import streamsx.standard.utility as U
topo = Topology()
s = topo.source(range(100))
c = PendingStream(topo)
g = U.gate(s, c.stream, max_unacked=1)
g = g.map(lambda _ : time.time())
r = g.map(U.Delay(delay=1.0))
c.complete(r)
Args:
stream(Stream): Stream to be gated.
control(Stream): Controlling stream.
stream(:py:class:`topology_ref:streamsx.topology.topology.Stream`): Stream to be gated.
control(:py:class:`topology_ref:streamsx.topology.topology.Stream`): Controlling stream.
max_unacked(int): Maximum of tuples allowed through the gate without acknowledgement.
ack_count(int): Count of tuples to acknowledge with each tuple arriving on `control`.
name(str): Name of resultant stream, defaults to a generated name.
Returns:
Stream: Gated stream.
:py:class:`topology_ref:streamsx.topology.topology.Stream`: Gated stream.
"""
ack_count = uint32(ack_count)
_op = _Gate([stream,control], maxUnackedTupleCount=max_unacked, numTuplesToAck=ack_count,name=name)
Expand Down

0 comments on commit 4f7e327

Please sign in to comment.