From 2374061b5df25803ff2ce95e2c27ab6c35d06aae Mon Sep 17 00:00:00 2001 From: Aaron Steinfeld Date: Wed, 21 Jul 2021 08:03:49 -0400 Subject: [PATCH 1/2] fix: address more concurrency concerns in caching client --- .../service/rxclient/EntityDataCachingClient.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java index 62993aa1..b365461e 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; import javax.annotation.Nonnull; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc; @@ -38,8 +37,8 @@ class EntityDataCachingClient implements EntityDataClient { private final LoadingCache> cache; private final ConcurrentMap pendingEntityUpdates = new ConcurrentHashMap<>(); - private final Striped pendingUpdateStripedLock = - Striped.lazyWeakReadWriteLock(PENDING_UPDATE_MAX_LOCK_COUNT); + private final Striped pendingUpdateStripedLock = + Striped.lazyWeakLock(PENDING_UPDATE_MAX_LOCK_COUNT); private final Clock clock; EntityDataCachingClient( @@ -69,8 +68,8 @@ public Single createOrUpdateEntityEventually( SingleSubject singleSubject = SingleSubject.create(); EntityKey entityKey = EntityKey.entityInCurrentContext(entity); - // Acquire lock allowing multiple concurrent update adds, but only if no update executions - Lock lock = this.pendingUpdateStripedLock.get(entityKey).readLock(); + // Don't allow other update processing until finished + Lock lock = this.pendingUpdateStripedLock.get(entityKey); try { lock.lock(); this.pendingEntityUpdates @@ -121,8 +120,8 @@ private class PendingEntityUpdate { private UpsertCondition condition; private void executeUpdate() { - // Acquire write lock to ensure no more modification of this update - Lock lock = pendingUpdateStripedLock.get(entityKey).writeLock(); + // Make sure no current additions + Lock lock = pendingUpdateStripedLock.get(entityKey); try { lock.lock(); EntityDataCachingClient.this.pendingEntityUpdates.remove(entityKey); From 7cc0defa9d008695f00e319050fa8d96db1db608 Mon Sep 17 00:00:00 2001 From: Aaron Steinfeld Date: Wed, 21 Jul 2021 08:43:50 -0400 Subject: [PATCH 2/2] chore: remove redundant cache put --- .../entity/data/service/rxclient/EntityDataCachingClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java index b365461e..f9e1bb49 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java @@ -130,7 +130,6 @@ private void executeUpdate() { } Single updateResult = EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache(); - EntityDataCachingClient.this.cache.put(entityKey, updateResult); responseObservers.forEach(updateResult::subscribe); }