Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import javax.annotation.Nonnull;
import org.hypertrace.entity.data.service.v1.Entity;
import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc;
Expand All @@ -38,8 +37,8 @@ class EntityDataCachingClient implements EntityDataClient {
private final LoadingCache<EntityKey, Single<Entity>> cache;
private final ConcurrentMap<EntityKey, PendingEntityUpdate> pendingEntityUpdates =
new ConcurrentHashMap<>();
private final Striped<ReadWriteLock> pendingUpdateStripedLock =
Striped.lazyWeakReadWriteLock(PENDING_UPDATE_MAX_LOCK_COUNT);
private final Striped<Lock> pendingUpdateStripedLock =
Striped.lazyWeakLock(PENDING_UPDATE_MAX_LOCK_COUNT);
private final Clock clock;

EntityDataCachingClient(
Expand Down Expand Up @@ -69,8 +68,8 @@ public Single<Entity> createOrUpdateEntityEventually(
SingleSubject<Entity> singleSubject = SingleSubject.create();
EntityKey entityKey = EntityKey.entityInCurrentContext(entity);

// Acquire lock allowing multiple concurrent update adds, but only if no update executions
Lock lock = this.pendingUpdateStripedLock.get(entityKey).readLock();
// Don't allow other update processing until finished
Lock lock = this.pendingUpdateStripedLock.get(entityKey);
try {
lock.lock();
this.pendingEntityUpdates
Expand Down Expand Up @@ -121,8 +120,8 @@ private class PendingEntityUpdate {
private UpsertCondition condition;

private void executeUpdate() {
// Acquire write lock to ensure no more modification of this update
Lock lock = pendingUpdateStripedLock.get(entityKey).writeLock();
// Make sure no current additions
Lock lock = pendingUpdateStripedLock.get(entityKey);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L133 seems redundant since cache update is already done within L132. Can be removed.

Copy link
Contributor Author

@aaron-steinfeld aaron-steinfeld Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two different uses of the word cache.

Single<Entity> updateResult =
          EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache();

Caches in the Single - that is, all subscribers will share a result rather than re-executing
( http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html#cache-- )

EntityDataCachingClient.this.cache.put(entityKey, updateResult);

puts the single itself into the guava cache for future callers

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if we go inside:

private Single<Entity> createOrUpdateEntity(
      EntityKey entityKey, UpsertCondition upsertCondition) {
    Single<Entity> updateResult =
        this.upsertEntityWithoutCaching(entityKey, upsertCondition).cache();
    EntityDataCachingClient.this.cache.put(entityKey, updateResult);
    return updateResult;
  }

that is already adding the single within the guava cache, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following up on this, my mind is clearly all over the place 🙁 - addressed.

try {
lock.lock();
EntityDataCachingClient.this.pendingEntityUpdates.remove(entityKey);
Expand All @@ -131,7 +130,6 @@ private void executeUpdate() {
}
Single<Entity> updateResult =
EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache();
EntityDataCachingClient.this.cache.put(entityKey, updateResult);

responseObservers.forEach(updateResult::subscribe);
}
Expand Down