Skip to content

Commit

Permalink
Merge pull request #37 from markheger/master
Browse files Browse the repository at this point in the history
0.7.0
  • Loading branch information
markheger committed Feb 5, 2020
2 parents cc5c416 + 068c7df commit c89541c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 17 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.6'
version = '0.7'
# The full version, including alpha/beta/rc tags.
release = '0.6.1'
release = '0.7.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
1 change: 0 additions & 1 deletion requirements.txt

This file was deleted.

2 changes: 1 addition & 1 deletion rtd/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
streamsx
streamsx>=1.14.2a0,<2.0
2 changes: 1 addition & 1 deletion streamsx/standard/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__='0.6.1'
__version__='0.7.0'
61 changes: 49 additions & 12 deletions streamsx/standard/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,22 +294,58 @@ def std(self, attribute, sample=False):
return self._output_func('SampleStdDev' if sample else 'PopulationStdDev', attribute)


class _Filter(Invoke):
class Filter(Invoke):
"""Removes tuples from a stream by passing along only those tuples that satisfy a user-specified condition.
Non-matching tuples can be sent to a second optional output.
The schema transformation is implemented using the ``spl.relational::Filter``
SPL primitive operator from the SPL Standard toolkit.
Example with one output stream::
import streamsx.standard.relational as R
import streamsx.standard.utility as U
topo = Topology()
s = U.sequence(topo, iterations=4)
matches = R.Filter.matching(s, filter='seq>=2ul')
Example with matching and non matching streams::
topo = Topology()
s = U.sequence(topo, iterations=4)
matches, non_matches = R.Filter.matching(s, filter='seq<2ul', non_matching=True)
"""
@staticmethod
def matching(stream, filter, name=None):
_op = Filter(stream, name=name)
_op.params['filter'] = _op.expression(filter);
return _op.outputs[0]
def matching(stream, filter, non_matching=False, name=None):
"""Filters input tuples to one or two output streams
def __init__(self, stream, filter=None, non_matching=False, name=None):
Args:
stream: Input stream
filter(str): Specifies that the condition that determines the tuples to be passed along by the Filter operator
non_matching(bool): Non-matching tuples are sent to a second optional output stream
name(str): Invocation name, defaults to a generated name.
Returns:
Stream: matching tuples (optional second stream for non matching tuples).
"""
_op = Filter(stream, non_matching, name=name)
if filter is not None:
_op.params['filter'] = _op.expression(filter);
if non_matching:
return _op.outputs[0], _op.outputs[1]
else:
return _op.outputs[0]

def __init__(self, stream, non_matching=False, name=None):
topology = stream.topology
kind="spl.relational::Filter"
inputs=stream
schema = stream.oport.schema
schemas = [schema,schema] if non_matching else schema
params = dict()
if filter is not None:
params['filter'] = filter
super(Filter, self).__init__(topology,kind,inputs,schemas,params,name)


Expand All @@ -323,6 +359,9 @@ class Functor(Invoke):
Example with schema transformation and two output streams::
import streamsx.standard.relational as R
import streamsx.standard.utility as U
topo = Topology()
s = U.sequence(topo, iterations=10) # schema is 'tuple<uint64 seq, timestamp ts>'
fo = R.Functor.map(s, [StreamSchema('tuple<uint64 seq>'),StreamSchema('tuple<timestamp ts>')])
Expand All @@ -344,8 +383,8 @@ def map(stream, schema, filter=None, name=None):
Args:
stream: Input stream
schema(str,StreamSchema): Schema of output stream.
filter(str): Attribute name to group aggregations.
schema(str,StreamSchema): Schema of output stream(s).
filter(str): Specifies the condition that determines which input tuples are to be operated on.
name(str): Invocation name, defaults to a generated name.
Returns:
Expand All @@ -361,8 +400,6 @@ def __init__(self, stream, schemas, filter=None, name=None):
kind="spl.relational::Functor"
inputs=stream
params = dict()
if filter is not None:
params['filter'] = filter
super(Functor, self).__init__(topology,kind,inputs,schemas,params,name)


Expand Down
36 changes: 36 additions & 0 deletions streamsx/standard/tests/test_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,39 @@ def test_transform_schema_two_outputs(self):
tester.tuple_count(b, 2)
tester.test(self.test_ctxtype, self.test_config)


class TestFilter(TestCase):
def setUp(self):
Tester.setup_standalone(self)


def test_single_output(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
matches = R.Filter.matching(s, filter='seq<2ul')

tester = Tester(topo)
tester.tuple_count(matches, 2)
tester.test(self.test_ctxtype, self.test_config)


def test_non_matching_output(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
matches, non_matches = R.Filter.matching(s, filter='seq<2ul', non_matching=True)

tester = Tester(topo)
tester.tuple_count(matches, 2)
tester.tuple_count(non_matches, 2)
tester.test(self.test_ctxtype, self.test_config)

def test_filter_none(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
matches = R.Filter.matching(s, filter=None)

tester = Tester(topo)
tester.tuple_count(matches, 4)
tester.test(self.test_ctxtype, self.test_config)


0 comments on commit c89541c

Please sign in to comment.