-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7917] Fix datastore writes failing on retry #9294
Conversation
When we call batch.begin() the status is set as IN_PROGRESS and then to FINISHED after *any* call to commit(), successful or not. commit() can only be called on a batch with status IN_PROGRESS -- it will fail with ValueError otherwise. https://github.com/googleapis/google-cloud-python/blob/master/datastore/google/cloud/datastore/batch.py https://issues.apache.org/jira/browse/BEAM-7917
R: @tvalentyn |
R: @udim who is more familiar with this IO. |
Pushed an update, please take another look. If we stop constructing batch object in There's also no public API for commiting a bunch of protobufs directly (which would be perfect for us). So what I come up with is that we are doing everything just like before, but we also keep original non-protobuf entity/key so we can use those to reconstruct the branch only on retry. Should be no performance hit unless we are already failing. (didn't actually test this yet in the wild, will do once we are clear on implementation) |
Made a pass today. Sorry for the delay!
I believe it is possible to avoid converting and calculating more than once. See my other comments.
Yes, that's the tradeoff with using the new client. |
@@ -340,12 +340,13 @@ def finish_bundle(self): | |||
def _init_batch(self): | |||
self._batch_bytes_size = 0 | |||
self._batch = self._client.batch() | |||
self._batch.begin() | |||
self._batch_mutations = [] | |||
|
|||
def _flush_batch(self): | |||
# Flush the current batch of mutations to Cloud Datastore. | |||
latency_ms = helper.write_mutations( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having both _batch_mutations and _batch seems redundant.
I would move write_mutations
into this class and create the batch object in it.
You could then take advantage of add_element_to_batch
to populate the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding ByteSize(), we could split add_element_to_batch into 2 part (just a suggestion):
def element_to_client_batch_item(self, element):
if not isinstance(element, types.Entity):
raise ValueError('apache_beam.io.gcp.datastore.v1new.datastoreio.Entity'
' expected, got: %s' % type(element))
if not element.key.project:
element.key.project = self._project
client_entity = element.to_client_entity()
if client_entity.key.is_partial:
raise ValueError('Entities to be written to Cloud Datastore must '
'have complete keys:\n%s' % client_entity)
return client_entity
def add_to_batch(self, client_batch_item):
self._batch.put(client_entity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made related changes but still couldn't avoid storing batch and raw batch elements separately.
We could skip the rest of elements (over the bytes limit) when building the batch in write_mutations
and let them be picked up on the next batch (so we won't purge _batch but there might be some elements left from previous batch) -- but it seems to create more confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your choice is a valid compromise with what to do with the results of element_to_client_batch_item
. I see 2 options here:
- Save
client_element
items. - Discard
client_element
items after callingByteSize()
on them.
The first option seems more CPU efficient while the second seems more memory efficient. I don't know which option is faster, but from my limited experience saving CPU at the expense of RAM seems like a good tradeoff.
What are the next steps for this PR? |
Another review pass would be helpful. |
@udim could you please make another pass when you get a chance? |
run python precommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM
* Fix datastore writes failing on retry When we call batch.begin() the status is set as IN_PROGRESS and then to FINISHED after *any* call to commit(), successful or not. commit() can only be called on a batch with status IN_PROGRESS -- it will fail with ValueError otherwise. https://github.com/googleapis/google-cloud-python/blob/master/datastore/google/cloud/datastore/batch.py https://issues.apache.org/jira/browse/BEAM-7917 * Reconstruct datastore batch from scratch after failures * Move write_mutations into datastoreio
When we call batch.begin() the status is set as IN_PROGRESS and then to FINISHED after any call to commit(), successful or not.
commit() can only be called on a batch with status IN_PROGRESS -- it will fail with ValueError otherwise.
I cannot say how safe is it to use those private properties of another library from here, but it fixes the issue. We could also re-create batch from scratch (so we don't have to change that private status) -- but not sure if that would cost us any performance.
https://github.com/googleapis/google-cloud-python/blob/master/datastore/google/cloud/datastore/batch.py
https://issues.apache.org/jira/browse/BEAM-7917
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.