-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Allow sending committable from Unfold #371
fix: Allow sending committable from Unfold #371
Conversation
next_message = Message(Value(value, {}, message.timestamp)) | ||
for value in iterable: | ||
next_message = Message( | ||
value=cast(Value[TOutput], value), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this cast should be necessary, more likely somewhere else the types weren't updated
@mj0nez I haven't looked at the PR in detail yet since your recent changes, do you need help with the types, or what is the status? I can make some room this week to continue it if so |
Yeah... it would be great if you could take over. |
@untitaker Did you get a chance to look at the PR? :) |
discard my earlier comment. this is done now. thanks! |
This PR aligns the
Unbatch
andUnfold
strategies to the behavior of their counterpartsBatch
andReduce
.Unbatch
now just submits the values of a submittedValuesBatch
one after another to the next step. Therefore, the logic was reduced to rely fully on the passed generator function for building aMessage
'sValue
.I tried my best fixing the typing issues but there is still one in
arroyo/processing/strategies/batching.py:116
with which I had no luck.Closes #369