New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-991] Comply with byte limit for Datastore Commit. #2948
Conversation
R: @vikkyrk I thought this was deemed to be an incomplete fix because the underlying Cloud Datastore service might replicate the bytes |
It is true that some requests complying with the limits might hit other internal limits in the stack. But this would be rare:
This fix helps most users that will see this problem. Allowing for other cases — cases that the client cannot foresee — would need a more complex solution, but even if we implement that later, we would probably keep what this change adds: do not send requests exceeding the known limits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with this as long it is well documented what cases work and what do not. Also it will be great if we can add an integration test here to test large entities https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
.
@Test | ||
public void testDatatoreWriterFnWithLargeEntities() throws Exception { | ||
List<Mutation> mutations = new ArrayList<>(); | ||
for (int i = 0; i < 12; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be computed from DATASTORE_BATCH_UPDATE_BYTES_LIMIT
, so that this test is robust against any changes in the value?
// requests, but we only need each CommitRequest to be less than 10MB in size. | ||
int start = 0; | ||
while (start < mutations.size()) { | ||
int end = Math.min(mutations.size(), start + 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can 4
be computed instead?
Test changes done. |
Run Java PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, all tests are green. Mostly LGTM with minor comment.
@@ -208,6 +208,14 @@ | |||
static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; | |||
|
|||
/** | |||
* Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the doc here https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L183 the limitations of writes here (specifically about the possibility of large indexed entities failing).
@@ -1152,7 +1161,14 @@ public void startBundle(StartBundleContext c) { | |||
|
|||
@ProcessElement | |||
public void processElement(ProcessContext c) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious if this impacts deletes
in any way, since this DoFn
is used across writes and deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deletes are fine, I think, as they wouldn't send enough data per entity to trouble this limit.
Let's discuss docs separately.
LGTM, thanks! Please sync with master, and once tests pass I will ask one of the committers to merge it. |
Thanks — updated. |
Thanks. @jkff could you take a pass and merge this? |
This changes the Datastore connector to limit RPCs both by overall size and by the number of entities contained, to fit within the Datastore API limits https://cloud.google.com/datastore/docs/concepts/limits .
A similar change for the Python SDK is already written and will be submitted separately.