diff --git a/.snyk b/.snyk index d0e59374..56ab1f94 100644 --- a/.snyk +++ b/.snyk @@ -5,5 +5,5 @@ ignore: SNYK-JAVA-IONETTY-1042268: - '*': reason: None Given - expires: 2021-07-31T00:00:00.000Z + expires: 2021-10-31T00:00:00.000Z patch: {} diff --git a/entity-data-service-rx-client/build.gradle.kts b/entity-data-service-rx-client/build.gradle.kts index 7e004dd8..335db586 100644 --- a/entity-data-service-rx-client/build.gradle.kts +++ b/entity-data-service-rx-client/build.gradle.kts @@ -18,6 +18,9 @@ dependencies { implementation("org.slf4j:slf4j-api:1.7.30") implementation("com.google.guava:guava:30.1.1-jre") + implementation("org.slf4j:slf4j-api:1.7.30") + annotationProcessor("org.projectlombok:lombok:1.18.18") + compileOnly("org.projectlombok:lombok:1.18.18") testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java index f9e1bb49..8689a1ad 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClient.java @@ -11,19 +11,18 @@ import io.grpc.Channel; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Single; -import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.internal.functions.Functions; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.hypertrace.core.grpcutils.context.RequestContext; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc; import org.hypertrace.entity.data.service.v1.EntityDataServiceGrpc.EntityDataServiceStub; @@ -31,6 +30,7 @@ import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityRequest.UpsertCondition; import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityResponse; +@Slf4j class EntityDataCachingClient implements EntityDataClient { private static final int PENDING_UPDATE_MAX_LOCK_COUNT = 1000; private final EntityDataServiceStub entityDataClient; @@ -57,16 +57,12 @@ class EntityDataCachingClient implements EntityDataClient { } @Override - public Single getOrCreateEntity(Entity entity) { - EntityKey entityKey = EntityKey.entityInCurrentContext(entity); - return this.cache.getUnchecked(entityKey).doOnError(x -> this.cache.invalidate(entityKey)); - } - - @Override - public Single createOrUpdateEntityEventually( - Entity entity, UpsertCondition condition, Duration maximumUpsertDelay) { - SingleSubject singleSubject = SingleSubject.create(); - EntityKey entityKey = EntityKey.entityInCurrentContext(entity); + public void createOrUpdateEntityEventually( + RequestContext requestContext, + Entity entity, + UpsertCondition condition, + Duration maximumUpsertDelay) { + EntityKey entityKey = new EntityKey(requestContext, entity); // Don't allow other update processing until finished Lock lock = this.pendingUpdateStripedLock.get(entityKey); @@ -74,16 +70,16 @@ public Single createOrUpdateEntityEventually( lock.lock(); this.pendingEntityUpdates .computeIfAbsent(entityKey, unused -> new PendingEntityUpdate()) - .addNewUpdate(entityKey, singleSubject, condition, maximumUpsertDelay); + .addNewUpdate(entityKey, condition, maximumUpsertDelay); } finally { lock.unlock(); } - return singleSubject; } @Override - public Single createOrUpdateEntity(Entity entity, UpsertCondition upsertCondition) { - return this.createOrUpdateEntity(EntityKey.entityInCurrentContext(entity), upsertCondition); + public Single createOrUpdateEntity( + RequestContext requestContext, Entity entity, UpsertCondition upsertCondition) { + return this.createOrUpdateEntity(new EntityKey(requestContext, entity), upsertCondition); } private Single createOrUpdateEntity( @@ -113,7 +109,6 @@ private Single upsertEntityWithoutCaching(EntityKey key, UpsertCondition } private class PendingEntityUpdate { - private final List> responseObservers = new LinkedList<>(); private EntityKey entityKey; private Disposable updateExecutionTimer; private Instant currentDeadline; @@ -128,24 +123,20 @@ private void executeUpdate() { } finally { lock.unlock(); } - Single updateResult = - EntityDataCachingClient.this.createOrUpdateEntity(entityKey, condition).cache(); - - responseObservers.forEach(updateResult::subscribe); + EntityDataCachingClient.this + .createOrUpdateEntity(entityKey, condition) + .blockingSubscribe( + Functions.emptyConsumer(), error -> log.error("Error upserting entity", error)); } private void addNewUpdate( - EntityKey entityKey, - SingleObserver observer, - UpsertCondition condition, - Duration maximumDelay) { + EntityKey entityKey, UpsertCondition condition, Duration maximumDelay) { if (nonNull(updateExecutionTimer) && updateExecutionTimer.isDisposed()) { throw new IllegalStateException("Attempting to add new update after execution"); } this.entityKey = entityKey; this.condition = condition; - this.responseObservers.add(observer); Instant newDeadline = EntityDataCachingClient.this.clock.instant().plus(maximumDelay); if (isNull(currentDeadline) || this.currentDeadline.isAfter(newDeadline)) { this.currentDeadline = newDeadline; diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java index e358f567..2fa6d61e 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java @@ -9,6 +9,7 @@ import java.util.Objects; import javax.annotation.Nonnull; import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory; +import org.hypertrace.core.grpcutils.context.RequestContext; import org.hypertrace.entity.data.service.v1.Entity; import org.hypertrace.entity.data.service.v1.MergeAndUpsertEntityRequest.UpsertCondition; @@ -18,23 +19,12 @@ */ public interface EntityDataClient { - /** - * Gets the entity from the cache if available, otherwise upserts it and returns the result. The - * behavior of this may or may not be cached depending on the configuration. - * - * @param entity - * @return - */ - @Deprecated - Single getOrCreateEntity(Entity entity); - /** * Performs a throttled update of the provided entity, starting after no longer than the provided * maximumUpsertDelay. If newer candidates for the same entity are received before this update * occurs, the value of the newest value (and its condition, if any) will be used instead. This * allows a high number of potentially repetitive entity upserts to be processed without creating - * excessive overhead. Each returned single will propagate the result of the server call that - * eventually satisfies its deadline. + * excessive overhead. * *

Example: * @@ -46,28 +36,32 @@ public interface EntityDataClient { * * * At t=400ms (the deadline for the second invocation) entity-1 is upserted, using the most recent - * values (the fourth invocation - entity-1.v3). When this returns, the result will be given to - * the first, second and fourth invocations (the ones for that entity). At t=500ms, the deadline - * for the third invocation entity-2 is upserted and returned to the third invocation. + * values (the fourth invocation - entity-1.v3). At t=500ms, the deadline for the third invocation + * entity-2 is upserted. * + * @param requestContext * @param entity * @param upsertCondition * @param maximumUpsertDelay - * @return */ - Single createOrUpdateEntityEventually( - Entity entity, UpsertCondition upsertCondition, Duration maximumUpsertDelay); + void createOrUpdateEntityEventually( + RequestContext requestContext, + Entity entity, + UpsertCondition upsertCondition, + Duration maximumUpsertDelay); /** * Immediately creates or updates, merging with any existing data, the provided entity if the * provided condition is met. The new value is returned if the condition is met, else if the * condition is not met, the existing value is instead returned. * + * @param requestContext * @param entity * @param upsertCondition * @return The resulting entity */ - Single createOrUpdateEntity(Entity entity, UpsertCondition upsertCondition); + Single createOrUpdateEntity( + RequestContext requestContext, Entity entity, UpsertCondition upsertCondition); static Builder builder(@Nonnull Channel channel) { return new Builder(Objects.requireNonNull(channel)); diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityKey.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityKey.java index 8f2c4624..36d6ffdc 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityKey.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityKey.java @@ -17,10 +17,6 @@ class EntityKey { private static final String DEFAULT_TENANT_ID = "default"; - static EntityKey entityInCurrentContext(Entity inputEntity) { - return new EntityKey(requireNonNull(RequestContext.CURRENT.get()), requireNonNull(inputEntity)); - } - private final Entity inputEntity; private final String tenantId; private final GrpcRxExecutionContext executionContext; @@ -32,7 +28,7 @@ static EntityKey entityInCurrentContext(Entity inputEntity) { ? entity.getIdentifyingAttributesMap() : entity.getEntityId()); - protected EntityKey(@Nonnull RequestContext requestContext, @Nonnull Entity inputEntity) { + EntityKey(@Nonnull RequestContext requestContext, @Nonnull Entity inputEntity) { requireNonNull(inputEntity.getEntityId()); requireNonNull(inputEntity.getEntityType()); this.executionContext = GrpcRxExecutionContext.forContext(requestContext); diff --git a/entity-data-service-rx-client/src/test/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClientTest.java b/entity-data-service-rx-client/src/test/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClientTest.java index ed56bc6a..083a1f6a 100644 --- a/entity-data-service-rx-client/src/test/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClientTest.java +++ b/entity-data-service-rx-client/src/test/java/org/hypertrace/entity/data/service/rxclient/EntityDataCachingClientTest.java @@ -1,27 +1,19 @@ package org.hypertrace.entity.data.service.rxclient; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import io.grpc.Context; import io.grpc.ManagedChannel; import io.grpc.Server; -import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; -import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.TestScheduler; import java.io.IOException; @@ -61,7 +53,6 @@ class EntityDataCachingClientTest { Server grpcServer; ManagedChannel grpcChannel; - Context grpcTestContext; List possibleResponseEntities; Optional responseError; TestScheduler testScheduler; @@ -78,7 +69,6 @@ void beforeEach() throws IOException { this.grpcChannel = InProcessChannelBuilder.forName(uniqueName).directExecutor().build(); this.dataClient = EntityDataClient.builder(this.grpcChannel).build(); when(this.mockContext.getTenantId()).thenReturn(Optional.of("default tenant")); - this.grpcTestContext = Context.current().withValue(RequestContext.CURRENT, this.mockContext); this.possibleResponseEntities = List.of(this.defaultResponseEntity); this.responseError = Optional.empty(); doAnswer( @@ -119,107 +109,13 @@ void afterEach() { } @Test - void cachesConsecutiveCallsForSameEntity() throws Exception { - Entity inputEntity = this.defaultResponseEntity.toBuilder().clearEntityName().build(); + void createOrUpdateCallsUpsert() { assertSame( this.defaultResponseEntity, - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - - verify(this.mockDataService, times(1)).mergeAndUpsertEntity(any(), any()); - verifyNoMoreInteractions(this.mockDataService); - assertSame( - this.defaultResponseEntity, - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - } - - @Test - void supportsMultipleConcurrentCacheKeys() throws Exception { - Entity inputEntity = this.defaultResponseEntity.toBuilder().clearEntityName().build(); - Entity defaultRetrieved = - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet()); - assertSame(this.defaultResponseEntity, defaultRetrieved); - verify(this.mockDataService, times(1)).mergeAndUpsertEntity(any(), any()); - - RequestContext otherMockContext = mock(RequestContext.class); - when(otherMockContext.getTenantId()).thenReturn(Optional.of("other tenant")); - Context otherGrpcContext = - Context.current().withValue(RequestContext.CURRENT, otherMockContext); - Entity otherEntityResponse = - this.defaultResponseEntity.toBuilder().setEntityName("name-2").build(); - - this.possibleResponseEntities = List.of(otherEntityResponse); - - Entity otherRetrieved = - otherGrpcContext.call(() -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet()); - assertSame(otherEntityResponse, otherRetrieved); - assertNotSame(defaultRetrieved, otherRetrieved); - verify(this.mockDataService, times(2)).mergeAndUpsertEntity(any(), any()); - verifyNoMoreInteractions(this.mockDataService); - - assertSame( - defaultRetrieved, - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - - assertSame( - otherRetrieved, - otherGrpcContext.call(() -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - } - - @Test - void retriesOnError() throws Exception { - Entity inputEntity = this.defaultResponseEntity.toBuilder().clearEntityName().build(); - this.responseError = Optional.of(new UnsupportedOperationException()); - - assertThrows( - StatusRuntimeException.class, - () -> - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - verify(this.mockDataService, times(1)).mergeAndUpsertEntity(any(), any()); - - this.responseError = Optional.empty(); - assertSame( - this.defaultResponseEntity, - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(inputEntity).blockingGet())); - verify(this.mockDataService, times(2)).mergeAndUpsertEntity(any(), any()); - } - - @Test - void hasConfigurableCacheSize() throws Exception { - this.dataClient = - EntityDataClient.builder(this.grpcChannel).withMaximumCacheContexts(1).build(); - - RequestContext otherMockContext = mock(RequestContext.class); - when(otherMockContext.getTenantId()).thenReturn(Optional.of("other tenant")); - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(this.defaultResponseEntity).blockingGet()); - - // This call should evict the original call - Context.current() - .withValue(RequestContext.CURRENT, otherMockContext) - .call(() -> this.dataClient.getOrCreateEntity(this.defaultResponseEntity).blockingGet()); - - // Rerunning this call now fire again, a third server call - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(this.defaultResponseEntity).blockingGet()); - verify(this.mockDataService, times(3)).mergeAndUpsertEntity(any(), any()); - } - - @Test - void createOrUpdateCallsUpsert() throws Exception { - assertSame( - this.defaultResponseEntity, - this.grpcTestContext.call( - () -> - this.dataClient - .createOrUpdateEntity( - this.defaultResponseEntity, UpsertCondition.getDefaultInstance()) - .blockingGet())); + this.dataClient + .createOrUpdateEntity( + mockContext, this.defaultResponseEntity, UpsertCondition.getDefaultInstance()) + .blockingGet()); verify(this.mockDataService, times(1)) .mergeAndUpsertEntity( @@ -232,46 +128,30 @@ void createOrUpdateCallsUpsert() throws Exception { } @Test - void createOrUpdateEventuallyThrottlesAndUsesLastProvidedValue() throws Exception { - TestObserver firstObserver = new TestObserver<>(); - TestObserver secondObserver = new TestObserver<>(); - TestObserver thirdObserver = new TestObserver<>(); + void createOrUpdateEventuallyThrottlesAndUsesLastProvidedValue() { this.possibleResponseEntities = Collections.emptyList(); // Just reflect entities in this test Entity firstEntity = this.defaultResponseEntity.toBuilder().setEntityName("first name").build(); Entity secondEntity = this.defaultResponseEntity.toBuilder().setEntityName("second name").build(); Entity thirdEntity = this.defaultResponseEntity.toBuilder().setEntityName("third name").build(); - this.grpcTestContext.run( - () -> - this.dataClient - .createOrUpdateEntityEventually( - firstEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(1000)) - .subscribe(firstObserver)); + + this.dataClient.createOrUpdateEntityEventually( + mockContext, firstEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(1000)); testScheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); - this.grpcTestContext.run( - () -> - this.dataClient - .createOrUpdateEntityEventually( - secondEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(200)) - .subscribe(secondObserver)); + + this.dataClient.createOrUpdateEntityEventually( + mockContext, secondEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(200)); testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS); - this.grpcTestContext.run( - () -> - this.dataClient - .createOrUpdateEntityEventually( - thirdEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(5000)) - .subscribe(thirdObserver)); + this.dataClient.createOrUpdateEntityEventually( + mockContext, thirdEntity, UpsertCondition.getDefaultInstance(), Duration.ofMillis(5000)); testScheduler.advanceTimeBy(49, TimeUnit.MILLISECONDS); // All 3 should complete at 500ms (we're currently at 499ms) verifyNoInteractions(this.mockDataService); - firstObserver.assertNotComplete().assertNoErrors().assertNoValues(); - secondObserver.assertNotComplete().assertNoErrors().assertNoValues(); - thirdObserver.assertNotComplete().assertNoErrors().assertNoValues(); testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); @@ -283,17 +163,5 @@ void createOrUpdateEventuallyThrottlesAndUsesLastProvidedValue() throws Exceptio .setUpsertCondition(UpsertCondition.getDefaultInstance()) .build()), any()); - - firstObserver.assertResult(thirdEntity); - secondObserver.assertResult(thirdEntity); - thirdObserver.assertResult(thirdEntity); - - // Result should be in cache now, let's try to fetch and verify no server call - Entity fetchedEntity = - this.grpcTestContext.call( - () -> this.dataClient.getOrCreateEntity(firstEntity).blockingGet()); - - assertEquals(thirdEntity, fetchedEntity); - verifyNoMoreInteractions(this.mockDataService); } }