-
Notifications
You must be signed in to change notification settings - Fork 6
fix: threading issue in entity caching client #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
previously, a pending update could be retrieved and modified after it has started updating, causing it to npe and leak memory
Codecov Report
@@ Coverage Diff @@
## main #112 +/- ##
============================================
+ Coverage 59.74% 59.78% +0.04%
+ Complexity 296 295 -1
============================================
Files 39 39
Lines 2971 2979 +8
Branches 368 368
============================================
+ Hits 1775 1781 +6
- Misses 1025 1026 +1
- Partials 171 172 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
| lock.lock(); | ||
| EntityDataCachingClient.this.pendingEntityUpdates.remove(entityKey); | ||
| } finally { | ||
| lock.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to unlock immediately after removal or should we do the rest of the processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be - I initially had it on the whole method, but since the write lock is exclusive it seems like it'd be better to scope it. As soon as the pending update is removed from the map, future reads should be good to go as they'll trigger the creation of a new update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
| } | ||
| Single<Entity> updateResult = | ||
| EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache(); | ||
| EntityDataCachingClient.this.cache.put(entityKey, updateResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above method at L133 EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache(); internally already adds the updated result into the cache:
private Single<Entity> createOrUpdateEntity(
EntityKey entityKey, UpsertCondition upsertCondition) {
Single<Entity> updateResult =
this.upsertEntityWithoutCaching(entityKey, upsertCondition).cache();
EntityDataCachingClient.this.cache.put(entityKey, updateResult);
return updateResult;
}
Hence the operation in L134 seems redundant.
| lock.lock(); | ||
| this.pendingEntityUpdates | ||
| .computeIfAbsent(entityKey, unused -> new PendingEntityUpdate()) | ||
| .addNewUpdate(entityKey, singleSubject, condition, maximumUpsertDelay); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the code. Given the read lock, multiple threads will be allowed to go ahead and perform 2 operations concurrently:
- Add a new entry within
pendingEntityUpdatesConcurrentMap (if absent). - Mutate the
pendingEntityUpdateinstance fetched from the map as part ofaddNewUpdate().
While step 1) is thread-safe given concurrentHashMap, step 2) is not thread-safe. Multiple threads may end up mutating List<SingleObserver<Entity>> responseObservers (since LinkedList is not thread-safe), along with overwriting other references like
private Disposable updateExecutionTimer; private Instant currentDeadline; private UpsertCondition condition;
We need to synchronize the mutation of pendingEntityUpdate instance, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's certainly fair. the issues we were seeing were due to interactions between 1 + 2, but as you point out multiple addNewUpdate calls are not safe currently. Will follow up with another change in there, hopefully to support concurrency without locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up in #114
Description
previously, a pending update could be retrieved and modified after it
has started updating, causing it to npe and leak memory