-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5953] Fix py3 type error in bundle_processor #7521
Conversation
+R: @robertwb @tvalentyn |
@@ -590,9 +590,11 @@ def get_coder(self, coder_id): | |||
if coder_proto.spec.spec.urn: | |||
return self.context.coders.get_by_id(coder_id) | |||
else: | |||
payload = coder_proto.spec.spec.payload | |||
if isinstance(payload, bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the if
? Is it not always bytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a hundred per cent sure. @robertwb Do we need check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this decoding is only needed in python 3, I prefer to add version check here. Same thing for chagnes to operation_specs
@@ -354,7 +354,7 @@ def get_coder_from_spec(coder_spec): | |||
|
|||
# We pass coders in the form "<coder_name>$<pickled_data>" to make the job | |||
# description JSON more readable. | |||
return coders.coders.deserialize_coder(coder_spec['@type']) | |||
return coders.coders.deserialize_coder(coder_spec['@type'].encode('utf-8')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deserialize_coder
only accept bytes. If we decode payload above, need to convert to bytes back here.
0e90215
to
ee9fbac
Compare
@@ -354,7 +354,10 @@ def get_coder_from_spec(coder_spec): | |||
|
|||
# We pass coders in the form "<coder_name>$<pickled_data>" to make the job | |||
# description JSON more readable. | |||
return coders.coders.deserialize_coder(coder_spec['@type']) | |||
coder = coder_spec['@type'] | |||
if not isinstance(coder, bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we understand in which codepath we happen to populate coders_spec without encoding it to bytes? If so, can we encode at the creation time? I think it would be easier to reason about SDK internals if we can state that this method always expects the same datatype (a bytestring) as input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here coder_spec is a cloud object dictionary, typically parsed from JSON, hence the unicode. We can unconditionally encode this for deserialization, but it's quite possible that utf-8 would not be the "right" encoding in this case for pickle.
Due to issues of passing arbitrary bytes through cloud protos, we actually base64 encode our serialized data in internal.pickler.loads/dumps, including here. As such, it should be safe to encode this with 'ascii' which would throw errors if there happen to be any higher code points (which there should not be, but if any creep in, it'd be better to have an explicit error here than a harder-to-deciper one later).
return operation_specs.get_coder_from_spec( | ||
json.loads(coder_proto.spec.spec.payload)) | ||
payload = coder_proto.spec.spec.payload | ||
if isinstance(payload, bytes) and sys.version_info[0] == 3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not be checking whether input is bytes
on Python 3, and should consistently expect the same datatype as input. Can we change this to:
if sys.version_info[0] > 2:
# json.loads() does not accept `bytes` on some versions of Python 3.
payload = payload.decode('utf-8')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Payload is always bytes, we should unconditionally decode this before passing to json for 2 and 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course what would be better is if the Dataflow runner harness was fixed to always use proper coder URNs rather than stuffing json-ized dataflow v1b3 cloud proto representations into the payload. Can you find/file a JIRA for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg. Also filed https://issues.apache.org/jira/browse/BEAM-6506
ee9fbac
to
6189694
Compare
6189694
to
a519547
Compare
PTAL @tvalentyn @robertwb |
Run Python PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming post-commit tests (triggered) also pass.
This is one step to enable Python 3 pipeline running on DataflowRunner. This change is to fix error in BEAM-5953#comment.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)