Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Striped;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.reactivex.rxjava3.core.Completable;
Expand All @@ -21,6 +22,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import javax.annotation.Nonnull;
import org.hypertrace.entity.data.service.v1.Entity;
import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc;
Expand All @@ -30,10 +33,13 @@
import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityResponse;

class EntityDataCachingClient implements EntityDataClient {
private static final int PENDING_UPDATE_MAX_LOCK_COUNT = 1000;
private final EntityDataServiceStub entityDataClient;
private final LoadingCache<EntityKey, Single<Entity>> cache;
private final ConcurrentMap<EntityKey, PendingEntityUpdate> pendingEntityUpdates =
new ConcurrentHashMap<>();
private final Striped<ReadWriteLock> pendingUpdateStripedLock =
Striped.lazyWeakReadWriteLock(PENDING_UPDATE_MAX_LOCK_COUNT);
private final Clock clock;

EntityDataCachingClient(
Expand Down Expand Up @@ -62,14 +68,17 @@ public Single<Entity> createOrUpdateEntityEventually(
Entity entity, UpsertCondition condition, Duration maximumUpsertDelay) {
SingleSubject<Entity> singleSubject = SingleSubject.create();
EntityKey entityKey = EntityKey.entityInCurrentContext(entity);
if (this.pendingEntityUpdates.containsKey(entityKey)) {
// Update the key - not strictly necessary (because the pending update holds the write data),
// but good hygiene
this.pendingEntityUpdates.put(entityKey, this.pendingEntityUpdates.get(entityKey));

// Acquire lock allowing multiple concurrent update adds, but only if no update executions
Lock lock = this.pendingUpdateStripedLock.get(entityKey).readLock();
try {
lock.lock();
this.pendingEntityUpdates
.computeIfAbsent(entityKey, unused -> new PendingEntityUpdate())
.addNewUpdate(entityKey, singleSubject, condition, maximumUpsertDelay);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the code. Given the read lock, multiple threads will be allowed to go ahead and perform 2 operations concurrently:

  1. Add a new entry within pendingEntityUpdates ConcurrentMap (if absent).
  2. Mutate the pendingEntityUpdate instance fetched from the map as part of addNewUpdate().

While step 1) is thread-safe given concurrentHashMap, step 2) is not thread-safe. Multiple threads may end up mutating List<SingleObserver<Entity>> responseObservers (since LinkedList is not thread-safe), along with overwriting other references like
private Disposable updateExecutionTimer; private Instant currentDeadline; private UpsertCondition condition;

We need to synchronize the mutation of pendingEntityUpdate instance, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's certainly fair. the issues we were seeing were due to interactions between 1 + 2, but as you point out multiple addNewUpdate calls are not safe currently. Will follow up with another change in there, hopefully to support concurrency without locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up in #114

} finally {
lock.unlock();
}
this.pendingEntityUpdates
.computeIfAbsent(entityKey, unused -> new PendingEntityUpdate())
.addNewUpdate(entityKey, singleSubject, condition, maximumUpsertDelay);
return singleSubject;
}

Expand Down Expand Up @@ -112,9 +121,14 @@ private class PendingEntityUpdate {
private UpsertCondition condition;

private void executeUpdate() {

EntityDataCachingClient.this.pendingEntityUpdates.remove(entityKey);

// Acquire write lock to ensure no more modification of this update
Lock lock = pendingUpdateStripedLock.get(entityKey).writeLock();
try {
lock.lock();
EntityDataCachingClient.this.pendingEntityUpdates.remove(entityKey);
} finally {
lock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to unlock immediately after removal or should we do the rest of the processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be - I initially had it on the whole method, but since the write lock is exclusive it seems like it'd be better to scope it. As soon as the pending update is removed from the map, future reads should be good to go as they'll trigger the creation of a new update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

}
Single<Entity> updateResult =
EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache();
EntityDataCachingClient.this.cache.put(entityKey, updateResult);
Copy link

@satish-mittal satish-mittal Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above method at L133 EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache(); internally already adds the updated result into the cache:

  private Single<Entity> createOrUpdateEntity(
      EntityKey entityKey, UpsertCondition upsertCondition) {
    Single<Entity> updateResult =
        this.upsertEntityWithoutCaching(entityKey, upsertCondition).cache();
    EntityDataCachingClient.this.cache.put(entityKey, updateResult);
    return updateResult;
  }

Hence the operation in L134 seems redundant.

Expand All @@ -127,6 +141,10 @@ private void addNewUpdate(
SingleObserver<Entity> observer,
UpsertCondition condition,
Duration maximumDelay) {
if (nonNull(updateExecutionTimer) && updateExecutionTimer.isDisposed()) {
throw new IllegalStateException("Attempting to add new update after execution");
}

this.entityKey = entityKey;
this.condition = condition;
this.responseObservers.add(observer);
Expand Down