feat: introduce closeAsync to Batcher#1423
Conversation
This should allow callers to signal that they are done using a batcher without blocking their thread.
There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit by scoping batch executors to a single method(List) invocation and making close explicit. This will be further cleaned up once googleapis/gax-java#1423 lands
elharo
left a comment
There was a problem hiding this comment.
There don't seem to be any tests for the new behavior.
|
added async specific tests, which in combination with the existing sync close tests to cover the functionality fully |
* fix: veneer adapter batching There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit by scoping batch executors to a single method(List) invocation and making close explicit. This will be further cleaned up once googleapis/gax-java#1423 lands * deflake test * use updated veneer for jwt fix * Revert "deflake test" This reverts commit 386ab65. * better error messaging
vam-google
left a comment
There was a problem hiding this comment.
I'm ok with the changes, but a few a few questions to confirm that the imlementation matches the original intentions/
| private final Object elementLock = new Object(); | ||
| private final Future<?> scheduledFuture; | ||
| private volatile boolean isClosed = false; | ||
| private SettableApiFuture<Void> closeFuture; |
There was a problem hiding this comment.
Should this remain volatile? I mean tthe referrence itself, because it seems it is checked for non-null value withotu anyu locks in the add() method below?
There was a problem hiding this comment.
I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.
But I dont see any harm in adding it, so I did
| BatchingException cause = (BatchingException) e.getCause(); | ||
| throw new BatchingException(cause.getMessage()); | ||
| } else { | ||
| throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); |
There was a problem hiding this comment.
This will loose the ExecutionException itself (with maybe some useful message there). I.e. why e.getCause() instead of just e as the second argument? Get cause will still be there as a "sub-exception".
There was a problem hiding this comment.
ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.
| return closeFuture; | ||
| } | ||
|
|
||
| private void finishClose() { |
There was a problem hiding this comment.
Not sure that this deserves its own private method, as it is used only once and very short.
There was a problem hiding this comment.
I inlined the method
| boolean shouldClose = false; | ||
|
|
||
| synchronized (flushLock) { | ||
| if (numOfOutstandingBatches.decrementAndGet() == 0) { |
There was a problem hiding this comment.
Maybe use <= 0 just in case as a safer option guaranteeing we will not jump over 0 and go into negative infinity?
There was a problem hiding this comment.
I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times
| if (isClosed) { | ||
| return; | ||
| try { | ||
| closeAsync().get(); |
There was a problem hiding this comment.
This is a change in behavior - looks like close used to not take flushLock at all, so it was non-blocking. Now it is blocking (taking flushLock via closeAsync). If it was intentional, then ok, but just decided to point it out.
There was a problem hiding this comment.
the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.
|
|
||
| boolean closeImmediately; | ||
|
|
||
| synchronized (flushLock) { |
There was a problem hiding this comment.
This would mean that closeAsync is still blocking potentially. The stuff under lock will execute immediatelly fast, but waiting for the lock may take arbitrarily long time. Is it Ok?
There was a problem hiding this comment.
I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely
igorbernstein2
left a comment
There was a problem hiding this comment.
@vam-google thanks for looking, I think I address all of the comments, PTAL
| private final Object elementLock = new Object(); | ||
| private final Future<?> scheduledFuture; | ||
| private volatile boolean isClosed = false; | ||
| private SettableApiFuture<Void> closeFuture; |
There was a problem hiding this comment.
I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.
But I dont see any harm in adding it, so I did
| boolean shouldClose = false; | ||
|
|
||
| synchronized (flushLock) { | ||
| if (numOfOutstandingBatches.decrementAndGet() == 0) { |
There was a problem hiding this comment.
I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times
| if (isClosed) { | ||
| return; | ||
| try { | ||
| closeAsync().get(); |
There was a problem hiding this comment.
the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.
| BatchingException cause = (BatchingException) e.getCause(); | ||
| throw new BatchingException(cause.getMessage()); | ||
| } else { | ||
| throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); |
There was a problem hiding this comment.
ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.
|
|
||
| boolean closeImmediately; | ||
|
|
||
| synchronized (flushLock) { |
There was a problem hiding this comment.
I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely
| return closeFuture; | ||
| } | ||
|
|
||
| private void finishClose() { |
There was a problem hiding this comment.
I inlined the method
🤖 I have created a release \*beep\* \*boop\* --- ## [1.67.0](https://www.github.com/googleapis/gax-java/compare/v1.66.0...v1.67.0) (2021-07-19) ### Features * introduce closeAsync to Batcher ([#1423](https://www.github.com/googleapis/gax-java/issues/1423)) ([aab5288](https://www.github.com/googleapis/gax-java/commit/aab528803405c2b5f9fc89641f47abff948a876d)) * optimize unary callables to not wait for trailers ([#1356](https://www.github.com/googleapis/gax-java/issues/1356)) ([dd5f955](https://www.github.com/googleapis/gax-java/commit/dd5f955a3ab740c677fbc6f1247094798eb814a3)) * update DirectPath environment variables ([#1412](https://www.github.com/googleapis/gax-java/issues/1412)) ([4f63b61](https://www.github.com/googleapis/gax-java/commit/4f63b61f1259936aa4a1eaf9162218c787b92f2a)) ### Bug Fixes * remove `extends ApiMessage` from `HttpJsonStubCallableFactory` definition ([#1426](https://www.github.com/googleapis/gax-java/issues/1426)) ([87636a5](https://www.github.com/googleapis/gax-java/commit/87636a5812874a77e9004aab07607121efa43736)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This should allow callers to signal that they are done using a batcher without blocking their thread.
This will be used in bigtable-hbase to implement async batching cleanup