Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .snyk
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ ignore:
SNYK-JAVA-IONETTY-1042268:
- '*':
reason: None Given
expires: 2021-07-31T00:00:00.000Z
expires: 2021-10-31T00:00:00.000Z
patch: {}
3 changes: 3 additions & 0 deletions entity-data-service-rx-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ dependencies {

implementation("org.slf4j:slf4j-api:1.7.30")
implementation("com.google.guava:guava:30.1.1-jre")
implementation("org.slf4j:slf4j-api:1.7.30")
annotationProcessor("org.projectlombok:lombok:1.18.18")
compileOnly("org.projectlombok:lombok:1.18.18")

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@
import io.grpc.Channel;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.SingleSubject;
import io.reactivex.rxjava3.internal.functions.Functions;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.entity.data.service.v1.Entity;
import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc;
import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc.EntityDataServiceStub;
import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityRequest;
import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityRequest.UpsertCondition;
import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityResponse;

@Slf4j
class EntityDataCachingClient implements EntityDataClient {
private static final int PENDING_UPDATE_MAX_LOCK_COUNT = 1000;
private final EntityDataServiceStub entityDataClient;
Expand All @@ -57,33 +57,29 @@ class EntityDataCachingClient implements EntityDataClient {
}

@Override
public Single<Entity> getOrCreateEntity(Entity entity) {
EntityKey entityKey = EntityKey.entityInCurrentContext(entity);
return this.cache.getUnchecked(entityKey).doOnError(x -> this.cache.invalidate(entityKey));
}

@Override
public Single<Entity> createOrUpdateEntityEventually(
Entity entity, UpsertCondition condition, Duration maximumUpsertDelay) {
SingleSubject<Entity> singleSubject = SingleSubject.create();
EntityKey entityKey = EntityKey.entityInCurrentContext(entity);
public void createOrUpdateEntityEventually(
RequestContext requestContext,
Entity entity,
UpsertCondition condition,
Duration maximumUpsertDelay) {
EntityKey entityKey = new EntityKey(requestContext, entity);

// Don't allow other update processing until finished
Lock lock = this.pendingUpdateStripedLock.get(entityKey);
try {
lock.lock();
this.pendingEntityUpdates
.computeIfAbsent(entityKey, unused -> new PendingEntityUpdate())
.addNewUpdate(entityKey, singleSubject, condition, maximumUpsertDelay);
.addNewUpdate(entityKey, condition, maximumUpsertDelay);
} finally {
lock.unlock();
}
return singleSubject;
}

@Override
public Single<Entity> createOrUpdateEntity(Entity entity, UpsertCondition upsertCondition) {
return this.createOrUpdateEntity(EntityKey.entityInCurrentContext(entity), upsertCondition);
public Single<Entity> createOrUpdateEntity(
RequestContext requestContext, Entity entity, UpsertCondition upsertCondition) {
return this.createOrUpdateEntity(new EntityKey(requestContext, entity), upsertCondition);
}

private Single<Entity> createOrUpdateEntity(
Expand Down Expand Up @@ -113,7 +109,6 @@ private Single<Entity> upsertEntityWithoutCaching(EntityKey key, UpsertCondition
}

private class PendingEntityUpdate {
private final List<SingleObserver<Entity>> responseObservers = new LinkedList<>();
private EntityKey entityKey;
private Disposable updateExecutionTimer;
private Instant currentDeadline;
Expand All @@ -128,24 +123,20 @@ private void executeUpdate() {
} finally {
lock.unlock();
}
Single<Entity> updateResult =
EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache();

responseObservers.forEach(updateResult::subscribe);
EntityDataCachingClient.this
.createOrUpdateEntity(entityKey, condition)
.blockingSubscribe(
Functions.emptyConsumer(), error -> log.error("Error upserting entity", error));
}

private void addNewUpdate(
EntityKey entityKey,
SingleObserver<Entity> observer,
UpsertCondition condition,
Duration maximumDelay) {
EntityKey entityKey, UpsertCondition condition, Duration maximumDelay) {
if (nonNull(updateExecutionTimer) && updateExecutionTimer.isDisposed()) {
throw new IllegalStateException("Attempting to add new update after execution");
}

this.entityKey = entityKey;
this.condition = condition;
this.responseObservers.add(observer);
Instant newDeadline = EntityDataCachingClient.this.clock.instant().plus(maximumDelay);
if (isNull(currentDeadline) || this.currentDeadline.isAfter(newDeadline)) {
this.currentDeadline = newDeadline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Objects;
import javax.annotation.Nonnull;
import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.entity.data.service.v1.Entity;
import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityRequest.UpsertCondition;

Expand All @@ -18,23 +19,12 @@
*/
public interface EntityDataClient {

/**
* Gets the entity from the cache if available, otherwise upserts it and returns the result. The
* behavior of this may or may not be cached depending on the configuration.
*
* @param entity
* @return
*/
@Deprecated
Single<Entity> getOrCreateEntity(Entity entity);

/**
* Performs a throttled update of the provided entity, starting after no longer than the provided
* maximumUpsertDelay. If newer candidates for the same entity are received before this update
* occurs, the value of the newest value (and its condition, if any) will be used instead. This
* allows a high number of potentially repetitive entity upserts to be processed without creating
* excessive overhead. Each returned single will propagate the result of the server call that
* eventually satisfies its deadline.
* excessive overhead.
*
* <p>Example:
*
Expand All @@ -46,28 +36,32 @@ public interface EntityDataClient {
* </ol>
*
* At t=400ms (the deadline for the second invocation) entity-1 is upserted, using the most recent
* values (the fourth invocation - entity-1.v3). When this returns, the result will be given to
* the first, second and fourth invocations (the ones for that entity). At t=500ms, the deadline
* for the third invocation entity-2 is upserted and returned to the third invocation.
* values (the fourth invocation - entity-1.v3). At t=500ms, the deadline for the third invocation
* entity-2 is upserted.
*
* @param requestContext
* @param entity
* @param upsertCondition
* @param maximumUpsertDelay
* @return
*/
Single<Entity> createOrUpdateEntityEventually(
Entity entity, UpsertCondition upsertCondition, Duration maximumUpsertDelay);
void createOrUpdateEntityEventually(
RequestContext requestContext,
Entity entity,
UpsertCondition upsertCondition,
Duration maximumUpsertDelay);

/**
* Immediately creates or updates, merging with any existing data, the provided entity if the
* provided condition is met. The new value is returned if the condition is met, else if the
* condition is not met, the existing value is instead returned.
*
* @param requestContext
* @param entity
* @param upsertCondition
* @return The resulting entity
*/
Single<Entity> createOrUpdateEntity(Entity entity, UpsertCondition upsertCondition);
Single<Entity> createOrUpdateEntity(
RequestContext requestContext, Entity entity, UpsertCondition upsertCondition);

static Builder builder(@Nonnull Channel channel) {
return new Builder(Objects.requireNonNull(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class EntityKey {

private static final String DEFAULT_TENANT_ID = "default";

static EntityKey entityInCurrentContext(Entity inputEntity) {
return new EntityKey(requireNonNull(RequestContext.CURRENT.get()), requireNonNull(inputEntity));
}

private final Entity inputEntity;
private final String tenantId;
private final GrpcRxExecutionContext executionContext;
Expand All @@ -32,7 +28,7 @@ static EntityKey entityInCurrentContext(Entity inputEntity) {
? entity.getIdentifyingAttributesMap()
: entity.getEntityId());

protected EntityKey(@Nonnull RequestContext requestContext, @Nonnull Entity inputEntity) {
EntityKey(@Nonnull RequestContext requestContext, @Nonnull Entity inputEntity) {
requireNonNull(inputEntity.getEntityId());
requireNonNull(inputEntity.getEntityType());
this.executionContext = GrpcRxExecutionContext.forContext(requestContext);
Expand Down
Loading