-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] non time-based windows / epochs #284
Comments
If I understood correctly, you can achieve what you want by writing a custom input source. Then, you can use the As an example, you can add a from datetime import datetime, timedelta, timezone
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicInput, StatelessSource
class PeriodicSource(StatelessSource):
def __init__(self, frequency):
self.frequency = frequency
self._next_awake = datetime.now(timezone.utc)
self._counter = 0
def next_awake(self):
return self._next_awake
def next_batch(self):
self._counter += 1
delay = datetime.now(timezone.utc) - self._next_awake
self._next_awake += self.frequency
return [(self._counter, f"delay (ms): {delay.total_seconds() * 1000:.3f}")]
class PeriodicInput(DynamicInput):
def __init__(self, frequency):
self.frequency = frequency
def build(self, worker_index, worker_count):
return PeriodicSource(frequency=self.frequency)
def reducer(acc, data):
if isinstance(acc, list):
acc.append(data)
else:
acc = [acc, data]
return acc
def is_complete(acc):
if not isinstance(acc, list):
return False
counter = acc[-1][0]
return counter % 5 == 0
stateless_flow = Dataflow()
stateless_flow.input("periodic", PeriodicInput(timedelta(seconds=1)))
stateless_flow.map(lambda x: ("ALL", x))
stateless_flow.reduce("counter_window", reducer, is_complete)
stateless_flow.output("stdout", StdOutput()) Is that in line with what you wanted? |
Thanks for the reply @Psykopear! I tried to run your example, but it didn't work. I got this error
I tried changing - def next_batch(self):
+ def next(self): to match the API docs. Then I ended up with this error
at which point I stopped trying to debug. I guess I still don't understand whether you example is still using a time-based clock or not. There is still a lot of time-related stuff in the code. Perhaps it would be helpful if you could explain the Source API a bit more. How does one control the timestamp associated with a Source when emitting events? |
sorry, I see now that the example is a bit out of scope regarding what you asked.
Not really, it's just using datetime to emit an event each second, but that's unrelated to your question, and it's probably making things less clear.
you don't have direct control over timely's epochs/timestamps, since we use those internally and hide them to the Python layer. So if you want to use a sequence number as a reference to window items or anything else, you need to add it to the data, but I see that you already did this with the The point of my previous answer is that you can manually implement windowing logic using the But I see your point now, having windowing operators able to work with any kind of orderable/comparable data type makes sense, but it will need some changes in the api of the operators. We'll talk about this with the rest of the team. Thanks for opening this, and sorry for the initial confusion. Just for completeness, here's a simplified version of the previous example. I use the from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
def reducer(acc, data):
if isinstance(acc, list):
acc.append(data)
else:
acc = [acc, data]
return acc
def is_complete(acc):
if not isinstance(acc, list):
return False
sequence_number = acc[-1]
return sequence_number % 5 == 0
flow = Dataflow()
flow.input("sequence_number", TestingInput(range(100)))
# Add a placeholder key for the stateful operator
flow.map(lambda x: ("ALL", x))
flow.reduce("accumulator", reducer, is_complete)
flow.output("stdout", StdOutput()) |
Thanks so much for your reply.
Yes, exactly, this is exactly what I was getting at. But I have a completely functional workaround for now, so I would not consider this urgent. Thanks for taking the time to help me understand the library better. It's very cool! |
Unfortunately this is not currently implemented in the general case; our windowing code requires using specifically datetimes. DiscussionIt's possible, but we'd have to move much of our windowing implementation from Rust into Python to allow for dynamic types in the relevant spots. Also note, windowing by in a streaming system can't be totally divorced from system time because of late, missing, and out-of-order data. Because a stream could be infinite, you need to ask "should I wait longer for possible out-of-order data?" The way this is traditionally done is via some sort of system time lateness timeout: you delay the closing of a window (whatever that means, even if it's not based on datetimes) until e.g. 10 more seconds have passed to be able to incorporate any out-of-order data that should have been in that window. Very inside baseball for this hypothetical implementation: This would be implemented via some new concept "logical watermark advancement"? You'd need a way of translating how the logical watermark of the clock should advance as system time marches forward, since the watermark is how you know a window is truly closed. The current event time clock (since the watermark is a datetime itself) just adds the system time duration to the old watermark. You might already be familiar, but if you're interested in more info on timely streaming, Flink has some great visualizations on how these concepts work in their docs. Bytewax is not implemented in exactly the same way, but the concepts are the same. Hacky Hacky HackYou said you came up with some workaround, but to add possibly another: if you don't care about system latency (you'll only close a logical window once an item of the next window shows up, even if that takes forever in wall clock time) and can guarantee data will not be out-of-order (not possible if using multi-partitioned input), you could re-create a lightweight version of logical tumbling windowing using the WINDOW_WIDTH = 5
flow.map(lambda item: (item.sequence_number % WINDOW_WIDTH, item))
flow.map(lambda win_item: ("ALL", win_item))
def builder():
# Assumes that item.sequence_number > 0 so -1 will never be a window ID.
return (-1, [])
def windower(win_l, win_item):
current_win, l = win_l
item_win, item = win_item
if item_win > current_win:
# Window just closed because we saw an item in the next window, setup the initial window and emit the old one.
return (item_win, [item]), l
else:
# Window still open, emit nothing downstream
l.append(item)
return win_l, []
flow.stateful_map("logical_tumbling_windower", builder, windower) If you want to handle partial out-of-order data, you could add in a watermark to this, but it gets complicated fast. |
Thanks for this amazing library! 🙏 I'm very excited to be able to play with timely dataflow concepts from Python.
Is your feature request related to a problem? Please describe.
One thing I like about the timely dataflow paradigm is this definition of "timestamps"
I would like to do exactly this with bytewax: create a clock that is just based on an integer sequence number (which I control from another system which is emitting records to be processed). I don't care about actual date. But I still want to apply windowed streaming operations over this dimension.
I read the guides and the API docs for windows, which state right at the top: "Time-based windows." I could not find any examples of anything other than time-based windows being supported.
Describe the solution you'd like
If this is supported, I'd like to see documentation about how to create events and windows using a "clock" that is not datetime based at all but rather uses a custom, user-defined integer-based clock. I'm thinking something like example from section 1.2 of the timely dataflow book.
If it is not yet supported, I'd be curious to know how hard it would be to add.
Describe alternatives you've considered
I have considered just spoofing what I want by abusing datetimes. I could define an EventClockConfig that converts my integer to some known timestamp, e.g.
However, this feels inefficient. Working with datatimes will always be slower and more error-prone than with the much simpler integer type.
The text was updated successfully, but these errors were encountered: