diff --git a/hyperstream/tools/apply/2017-11-27_v0.0.3.py b/hyperstream/tools/apply/2017-11-27_v0.0.3.py new file mode 100644 index 00000000..8c4e587c --- /dev/null +++ b/hyperstream/tools/apply/2017-11-27_v0.0.3.py @@ -0,0 +1,38 @@ +# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +# OR OTHER DEALINGS IN THE SOFTWARE. + +from hyperstream.stream import StreamInstance +from hyperstream.tool import Tool, check_input_stream_count + + +class Apply(Tool): + """ + Simple tool that applies a function to every data item + """ + def __init__(self, func, include_timestamps=False): + super(Apply, self).__init__(func=func, include_timestamps=include_timestamps) + + @check_input_stream_count(1) + def _execute(self, sources, alignment_stream, interval): + for t, d in sources[0].window(interval, force_calculation=True): + if self.include_timestamps: + yield StreamInstance(t, self.func(t, d)) + else: + yield StreamInstance(t, self.func(d))