New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BEAM-7141: add key value timer callback #8739
BEAM-7141: add key value timer callback #8739
Conversation
merging from upstream
StateParam = _StateDoFnParam | ||
TimerParam = _TimerDoFnParam | ||
KeyParam = _DoFnParam('KeyParam') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add all these args to DoFnProcessparams
list? Its only use so far is to check that these args are not used for bundle methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
@@ -183,6 +184,8 @@ def __init__(self, obj_to_invoke, method_name): | |||
self.timestamp_arg_name = kw | |||
elif v == core.DoFn.WindowParam: | |||
self.window_arg_name = kw | |||
elif v == core.DoFn.KeyParam: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also update the PerWindowInvoker (for process methods) (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L597)? (If possible please also update for window param that was added in the previous PR.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaltay
I may not have a complete background but in my understanding, these parameters are mainly for the timer callback method and may not be exactly the same as the process parameter. do we still need to update _invoke_process_per_window
? am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have a valid a point. There may not always be a key that can be passed to process() method. Although sometimes there might be.
Now that we have a KeyParam, a user might write a process method using that e.g. (process(mykey=KeyParam). What would be the reasonable thing to do in this case:
- We can do nothing and mykey will literally have the value KeyParam.
- We can fail with an error saying that KeyParam is not a valid parameter for process method.
- We can try to pass the key (e.g. k, v = element) and set mykey = k. And if that fails (i.e. element is not a K,V) we can fail.
I was leaning towards the third option. I think second option would also be fine and the first option will be confusing.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to know at pipeline construction time that the user is using a KV as the input type so that we don't have to defer to pipeline execution time to get an error. If no, I like Ahmets 3rd suggestion as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to do this at pipeline construction time. This will be similar to how type hints detection happens. (It will be inspection at execution time vs inspection at construction time.) However we do not have the machinery in place and I suspect it will make this PR more complicated that it is intended.
We can start with a execution time error and later on improve it to be a construction time error. We can do this backward compatibly by failing in construction time only when input type hints are not in the [K, V] form, preserving the same functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also leaning towards option 3. If the implementation scope is bigger than I would fallback to option 2 and complete the 3rd option in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type inference machinery can tell us whether we certainly have a KV type, whether we certainly do not have a KV type, or that it doesn't know whether we have a KV type (e.g. all it knows is that it's a Python object).
I also like option 3, and don't think it would be that hard.
Why? Key parameter was missing in the timer callback so it makes the debugging harder.
a3f4406
to
c7ebfa6
Compare
R: @aaltay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
* BEAM-7141: Add key parameter in timer callback Why? Key parameter was missing in the timer callback so it makes the debugging harder.
Key parameter was missing from the callback method. It makes the debugging harder because developer does not have enough context about the callback and its associated data. Key and window parameter will provide enough information in the callback and developer can successfully debug issue.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.