-
-
Notifications
You must be signed in to change notification settings - Fork 0
quick-fix: Ensure schema is retained after a batching step #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fpacifici
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please see the comments in line
| schema=None, | ||
| ) | ||
|
|
||
| if isinstance(payload, MutableSequence) and isinstance(payload[0], tuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only a MutableSequence? Shouldn't this work with Sequences as well ?
|
|
||
| if isinstance(payload, MutableSequence) and isinstance(payload[0], tuple): | ||
| batch = [] | ||
| schema = None | ||
| for tup in payload: | ||
| batch.append(tup[0]) | ||
|
|
||
| schema = payload[0][1] | ||
|
|
||
| msg = PyMessage( | ||
| payload=batch, | ||
| headers=[], | ||
| timestamp=timestamp, | ||
| schema=schema, | ||
| ) | ||
| else: | ||
| msg = PyMessage( | ||
| payload=payload, | ||
| headers=[], | ||
| timestamp=timestamp, | ||
| schema=None, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let's write unit tests for this logic (there is already a test_reduce_delegate you can add onto).
As this is a bit of a hack, a good way to keep it under control is to have this behavior well tested so that when we refactor this we will not miss corner cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do. Initially I just wanted to check if we're okay with this sort of hack in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is already checking what we need here. I made it a bit more "complex"
A really simple way to unblock the work where a batching step happens before parsing. I can also introduce the concept of
Schemain this PR, where we have an enum of all the allowed schemas / topic names.We should not stick with this in the long term, and likely enforce all Accumulators/Reduce steps to output a
Tuple[MutableSequence[InputType], Schema]. Making this is a more significant typing change as it involves making sure primitives likeFlatMapare also able to handle that output type.