diff --git a/object-store/build.gradle.kts b/object-store/build.gradle.kts index c45cd00c..a828f9a4 100644 --- a/object-store/build.gradle.kts +++ b/object-store/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { api(libs.hypertrace.grpcutils.context) implementation(projects.configServiceChangeEventGenerator) + implementation(libs.slf4j.api) annotationProcessor(libs.lombok) compileOnly(libs.lombok) diff --git a/object-store/src/main/java/org/hypertrace/config/objectstore/ContextuallyIdentifiedObjectStore.java b/object-store/src/main/java/org/hypertrace/config/objectstore/ContextuallyIdentifiedObjectStore.java index 482513d6..05709870 100644 --- a/object-store/src/main/java/org/hypertrace/config/objectstore/ContextuallyIdentifiedObjectStore.java +++ b/object-store/src/main/java/org/hypertrace/config/objectstore/ContextuallyIdentifiedObjectStore.java @@ -38,6 +38,14 @@ protected ContextuallyIdentifiedObjectStore( protected abstract Value buildValueFromData(T data); + protected Value buildValueForChangeEvent(T data) { + return this.buildValueFromData(data); + } + + protected String buildClassNameForChangeEvent(T data) { + return data.getClass().getName(); + } + protected abstract String getConfigContextFromRequestContext(RequestContext requestContext); private IdentifiedObjectStore buildObjectStoreForContext(RequestContext context) { @@ -86,6 +94,16 @@ protected Value buildValueFromData(T data) { return ContextuallyIdentifiedObjectStore.this.buildValueFromData(data); } + @Override + protected Value buildValueForChangeEvent(T data) { + return ContextuallyIdentifiedObjectStore.this.buildValueForChangeEvent(data); + } + + @Override + protected String buildClassNameForChangeEvent(T data) { + return ContextuallyIdentifiedObjectStore.this.buildClassNameForChangeEvent(data); + } + @Override protected String getContextFromData(T data) { return ContextuallyIdentifiedObjectStore.this.getConfigContextFromRequestContext( diff --git a/object-store/src/main/java/org/hypertrace/config/objectstore/DefaultObjectStore.java b/object-store/src/main/java/org/hypertrace/config/objectstore/DefaultObjectStore.java index 8c39ddae..386eb2ad 100644 --- a/object-store/src/main/java/org/hypertrace/config/objectstore/DefaultObjectStore.java +++ b/object-store/src/main/java/org/hypertrace/config/objectstore/DefaultObjectStore.java @@ -3,6 +3,7 @@ import com.google.protobuf.Value; import io.grpc.Status; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.hypertrace.config.service.change.event.api.ConfigChangeEventGenerator; import org.hypertrace.config.service.v1.ConfigServiceGrpc.ConfigServiceBlockingStub; import org.hypertrace.config.service.v1.ContextSpecificConfig; @@ -18,6 +19,7 @@ * * @param */ +@Slf4j public abstract class DefaultObjectStore { private final ConfigServiceBlockingStub configServiceBlockingStub; private final String resourceNamespace; @@ -49,6 +51,14 @@ protected DefaultObjectStore( protected abstract Value buildValueFromData(T data); + protected Value buildValueForChangeEvent(T data) { + return this.buildValueFromData(data); + } + + protected String buildClassNameForChangeEvent(T data) { + return data.getClass().getName(); + } + public Optional getData(RequestContext context) { try { Value value = @@ -91,12 +101,22 @@ public ConfigObject upsertObject(RequestContext context, T data) { if (response.hasPrevConfig()) { configChangeEventGenerator.sendUpdateNotification( context, - upsertedObject.getData().getClass().getName(), - response.getPrevConfig(), - response.getConfig()); + this.buildClassNameForChangeEvent(upsertedObject.getData()), + this.buildDataFromValue(response.getPrevConfig()) + .map(this::buildValueForChangeEvent) + .orElseGet( + () -> { + log.error( + "Unable to convert previousValue back to data for change event. Falling back to raw value {}", + response.getPrevConfig()); + return response.getPrevConfig(); + }), + this.buildValueForChangeEvent(upsertedObject.getData())); } else { configChangeEventGenerator.sendCreateNotification( - context, upsertedObject.getData().getClass().getName(), response.getConfig()); + context, + this.buildClassNameForChangeEvent(upsertedObject.getData()), + this.buildValueForChangeEvent(upsertedObject.getData())); } }); return upsertedObject; @@ -120,7 +140,9 @@ public Optional> deleteObject(RequestContext context) { configChangeEventGeneratorOptional.ifPresent( configChangeEventGenerator -> configChangeEventGenerator.sendDeleteNotification( - context, object.getData().getClass().getName(), deletedConfig.getConfig())); + context, + this.buildClassNameForChangeEvent(object.getData()), + this.buildValueForChangeEvent(object.getData()))); return Optional.of(object); } catch (Exception exception) { if (Status.fromThrowable(exception).equals(Status.NOT_FOUND)) { diff --git a/object-store/src/main/java/org/hypertrace/config/objectstore/IdentifiedObjectStore.java b/object-store/src/main/java/org/hypertrace/config/objectstore/IdentifiedObjectStore.java index 65cc0044..927cb808 100644 --- a/object-store/src/main/java/org/hypertrace/config/objectstore/IdentifiedObjectStore.java +++ b/object-store/src/main/java/org/hypertrace/config/objectstore/IdentifiedObjectStore.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.hypertrace.config.service.change.event.api.ConfigChangeEventGenerator; import org.hypertrace.config.service.v1.ConfigServiceGrpc.ConfigServiceBlockingStub; import org.hypertrace.config.service.v1.ContextSpecificConfig; @@ -24,6 +25,7 @@ * * @param */ +@Slf4j public abstract class IdentifiedObjectStore { private final ConfigServiceBlockingStub configServiceBlockingStub; private final String resourceNamespace; @@ -57,6 +59,14 @@ protected IdentifiedObjectStore( protected abstract String getContextFromData(T data); + protected Value buildValueForChangeEvent(T data) { + return this.buildValueFromData(data); + } + + protected String buildClassNameForChangeEvent(T data) { + return data.getClass().getName(); + } + protected List> orderFetchedObjects( List> objects) { return objects; @@ -141,7 +151,10 @@ public Optional> deleteObject(RequestContext context, configChangeEventGeneratorOptional.ifPresent( configChangeEventGenerator -> configChangeEventGenerator.sendDeleteNotification( - context, object.getData().getClass().getName(), id, deletedConfig.getConfig())); + context, + this.buildClassNameForChangeEvent(object.getData()), + id, + this.buildValueForChangeEvent(object.getData()))); return Optional.of(object); } catch (Exception exception) { if (Status.fromThrowable(exception).equals(Status.NOT_FOUND)) { @@ -181,6 +194,8 @@ private Optional> processUpsertResult( Optional> optionalResult = ContextualConfigObjectImpl.tryBuild( response, this::buildDataFromValue, this::getContextFromData); + + System.out.println(optionalResult); optionalResult.ifPresent( result -> { if (response.hasPrevConfig()) { @@ -213,9 +228,9 @@ private void tryReportCreation(RequestContext requestContext, ContextualConfigOb configChangeEventGenerator -> configChangeEventGenerator.sendCreateNotification( requestContext, - result.getData().getClass().getName(), + this.buildClassNameForChangeEvent(result.getData()), result.getContext(), - this.buildValueFromData(result.getData()))); + this.buildValueForChangeEvent(result.getData()))); } private void tryReportUpdate( @@ -224,9 +239,17 @@ private void tryReportUpdate( configChangeEventGenerator -> configChangeEventGenerator.sendUpdateNotification( requestContext, - result.getData().getClass().getName(), + this.buildClassNameForChangeEvent(result.getData()), result.getContext(), - previousValue, - this.buildValueFromData(result.getData()))); + this.buildDataFromValue(previousValue) + .map(this::buildValueForChangeEvent) + .orElseGet( + () -> { + log.error( + "Unable to convert previousValue back to data for change event. Falling back to raw value {}", + previousValue); + return previousValue; + }), + this.buildValueForChangeEvent(result.getData()))); } } diff --git a/object-store/src/test/java/org/hypertrace/config/objectstore/DefaultObjectStoreTest.java b/object-store/src/test/java/org/hypertrace/config/objectstore/DefaultObjectStoreTest.java index f3c3e1cf..f8f83ee1 100644 --- a/object-store/src/test/java/org/hypertrace/config/objectstore/DefaultObjectStoreTest.java +++ b/object-store/src/test/java/org/hypertrace/config/objectstore/DefaultObjectStoreTest.java @@ -2,11 +2,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.protobuf.Struct; import com.google.protobuf.Value; import com.google.protobuf.util.Values; import io.grpc.Status; @@ -43,7 +45,7 @@ class DefaultObjectStoreTest { @Mock(answer = Answers.CALLS_REAL_METHODS) RequestContext mockRequestContext; - DefaultObjectStore store; + DefaultObjectStore store; @BeforeEach void beforeEach() { @@ -56,7 +58,8 @@ void generatesConfigReadRequestForGet() { when(this.mockStub.getConfig(any())) .thenReturn(GetConfigResponse.newBuilder().setConfig(Values.of("test")).build()); - assertEquals(Optional.of(new TestObject("test")), this.store.getData(this.mockRequestContext)); + assertEquals( + Optional.of(new TestInternalObject("test")), this.store.getData(this.mockRequestContext)); verify(this.mockStub, times(1)) .getConfig( @@ -91,7 +94,7 @@ void generatesConfigDeleteRequest() { assertEquals( Optional.of( new ConfigObjectImpl<>( - new TestObject("test"), TEST_CREATE_TIMESTAMP, TEST_UPDATE_TIMESTAMP)), + new TestInternalObject("test"), TEST_CREATE_TIMESTAMP, TEST_UPDATE_TIMESTAMP)), this.store.deleteObject(mockRequestContext)); verify(this.mockStub) @@ -104,6 +107,15 @@ void generatesConfigDeleteRequest() { when(this.mockStub.deleteConfig(any())).thenThrow(Status.NOT_FOUND.asRuntimeException()); assertEquals(Optional.empty(), this.store.deleteObject(mockRequestContext)); + + verify(this.configChangeEventGenerator, times(1)) + .sendDeleteNotification( + eq(this.mockRequestContext), + eq(TestApiObject.class.getName()), + eq( + Value.newBuilder() + .setStructValue(Struct.newBuilder().putFields("api_name", Values.of("test"))) + .build())); } @Test @@ -115,10 +127,12 @@ void generatesConfigUpsertRequest() { .setUpdateTimestamp(TEST_UPDATE_TIMESTAMP.toEpochMilli()) .setConfig(Values.of("updated")) .build()); - assertEquals( + ConfigObject configObject = new ConfigObjectImpl<>( - new TestObject("updated"), TEST_CREATE_TIMESTAMP, TEST_UPDATE_TIMESTAMP), - this.store.upsertObject(this.mockRequestContext, new TestObject("updated"))); + new TestInternalObject("updated"), TEST_CREATE_TIMESTAMP, TEST_UPDATE_TIMESTAMP); + assertEquals( + configObject, + this.store.upsertObject(this.mockRequestContext, new TestInternalObject("updated"))); verify(this.mockStub, times(1)) .upsertConfig( UpsertConfigRequest.newBuilder() @@ -126,27 +140,52 @@ void generatesConfigUpsertRequest() { .setResourceNamespace(TEST_RESOURCE_NAMESPACE) .setConfig(Values.of("updated")) .build()); + verify(this.configChangeEventGenerator, times(1)) + .sendCreateNotification( + eq(this.mockRequestContext), + eq(TestApiObject.class.getName()), + eq( + Value.newBuilder() + .setStructValue(Struct.newBuilder().putFields("api_name", Values.of("updated"))) + .build())); } @lombok.Value - private static class TestObject { + private static class TestInternalObject { String name; } - private static class TestObjectStore extends DefaultObjectStore { + @lombok.Value + private static class TestApiObject { + String api_name; + } + + private static class TestObjectStore extends DefaultObjectStore { private TestObjectStore( ConfigServiceBlockingStub stub, ConfigChangeEventGenerator configChangeEventGenerator) { super(stub, TEST_RESOURCE_NAMESPACE, TEST_RESOURCE_NAME, configChangeEventGenerator); } @Override - protected Optional buildDataFromValue(Value value) { - return Optional.of(new TestObject(value.getStringValue())); + protected Optional buildDataFromValue(Value value) { + return Optional.of(new TestInternalObject(value.getStringValue())); } @Override - protected Value buildValueFromData(TestObject object) { + protected Value buildValueFromData(TestInternalObject object) { return Values.of(object.getName()); } + + @Override + protected Value buildValueForChangeEvent(TestInternalObject object) { + return Value.newBuilder() + .setStructValue(Struct.newBuilder().putFields("api_name", Values.of(object.getName()))) + .build(); + } + + @Override + protected String buildClassNameForChangeEvent(TestInternalObject object) { + return TestApiObject.class.getName(); + } } } diff --git a/object-store/src/test/java/org/hypertrace/config/objectstore/IdentifiedObjectStoreTest.java b/object-store/src/test/java/org/hypertrace/config/objectstore/IdentifiedObjectStoreTest.java index 0e144cfe..3fc93c21 100644 --- a/object-store/src/test/java/org/hypertrace/config/objectstore/IdentifiedObjectStoreTest.java +++ b/object-store/src/test/java/org/hypertrace/config/objectstore/IdentifiedObjectStoreTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,9 +49,11 @@ class IdentifiedObjectStoreTest { private static final Instant TEST_CREATE_TIMESTAMP_2 = Instant.ofEpochMilli(30); private static final Instant TEST_UPDATE_TIMESTAMP = Instant.ofEpochMilli(40); - private static final TestObject OBJECT_1 = TestObject.builder().id("first-id").rank(1).build(); + private static final TestInternalObject OBJECT_1 = + TestInternalObject.builder().id("first-id").rank(1).build(); - private static final TestObject OBJECT_2 = TestObject.builder().id("second-id").rank(2).build(); + private static final TestInternalObject OBJECT_2 = + TestInternalObject.builder().id("second-id").rank(2).build(); private static final Value OBJECT_1_AS_VALUE = Value.newBuilder() @@ -75,7 +78,7 @@ class IdentifiedObjectStoreTest { @Mock(answer = Answers.CALLS_REAL_METHODS) RequestContext mockRequestContext; - IdentifiedObjectStore store; + IdentifiedObjectStore store; @BeforeEach void beforeEach() { @@ -174,6 +177,19 @@ void generatesConfigDeleteRequest() { when(this.mockStub.deleteConfig(any())).thenThrow(Status.NOT_FOUND.asRuntimeException()); assertEquals(Optional.empty(), this.store.deleteObject(mockRequestContext, "some-id")); + + verify(this.configChangeEventGenerator, times(1)) + .sendDeleteNotification( + eq(this.mockRequestContext), + eq(TestApiObject.class.getName()), + eq("some-id"), + eq( + Value.newBuilder() + .setStructValue( + Struct.newBuilder() + .putFields("api_id", Values.of(OBJECT_1.getId())) + .putFields("api_rank", Values.of(OBJECT_1.getRank()))) + .build())); } @Test @@ -185,10 +201,11 @@ void generatesConfigUpsertRequest() { .setCreationTimestamp(TEST_CREATE_TIMESTAMP_1.toEpochMilli()) .setUpdateTimestamp(TEST_UPDATE_TIMESTAMP.toEpochMilli()) .build()); - assertEquals( + ContextualConfigObject contextualConfigObject = new ContextualConfigObjectImpl<>( - OBJECT_1.getId(), OBJECT_1, TEST_CREATE_TIMESTAMP_1, TEST_UPDATE_TIMESTAMP), - this.store.upsertObject(this.mockRequestContext, OBJECT_1)); + OBJECT_1.getId(), OBJECT_1, TEST_CREATE_TIMESTAMP_1, TEST_UPDATE_TIMESTAMP); + assertEquals( + contextualConfigObject, this.store.upsertObject(this.mockRequestContext, OBJECT_1)); verify(this.mockStub, times(1)) .upsertConfig( UpsertConfigRequest.newBuilder() @@ -197,6 +214,18 @@ void generatesConfigUpsertRequest() { .setContext("first-id") .setConfig(OBJECT_1_AS_VALUE) .build()); + verify(this.configChangeEventGenerator, times(1)) + .sendCreateNotification( + eq(this.mockRequestContext), + eq(TestApiObject.class.getName()), + eq(contextualConfigObject.getContext()), + eq( + Value.newBuilder() + .setStructValue( + Struct.newBuilder() + .putFields("api_id", Values.of(OBJECT_1.getId())) + .putFields("api_rank", Values.of(OBJECT_1.getRank()))) + .build())); } @Test @@ -242,30 +271,42 @@ void generatesUpsertRequestsForUpsertAll() { .build()); } + @Test + void buildClassNameForChangeEvent_test() { + assertEquals(TestApiObject.class.getName(), this.store.buildClassNameForChangeEvent(OBJECT_1)); + } + @lombok.Value @Builder - private static class TestObject { + private static class TestInternalObject { String id; int rank; } - private static class TestObjectStore extends IdentifiedObjectStore { + @lombok.Value + @Builder + private static class TestApiObject { + String api_id; + int api_rank; + } + + private static class TestObjectStore extends IdentifiedObjectStore { private TestObjectStore( ConfigServiceBlockingStub stub, ConfigChangeEventGenerator configChangeEventGenerator) { super(stub, TEST_RESOURCE_NAMESPACE, TEST_RESOURCE_NAME, configChangeEventGenerator); } @Override - protected Optional buildDataFromValue(Value value) { + protected Optional buildDataFromValue(Value value) { return Optional.of( - TestObject.builder() + TestInternalObject.builder() .rank((int) value.getStructValue().getFieldsOrThrow("rank").getNumberValue()) .id(value.getStructValue().getFieldsOrThrow("id").getStringValue()) .build()); } @Override - protected Value buildValueFromData(TestObject object) { + protected Value buildValueFromData(TestInternalObject object) { return Value.newBuilder() .setStructValue( Struct.newBuilder() @@ -275,13 +316,28 @@ protected Value buildValueFromData(TestObject object) { } @Override - protected String getContextFromData(TestObject object) { + protected Value buildValueForChangeEvent(TestInternalObject object) { + return Value.newBuilder() + .setStructValue( + Struct.newBuilder() + .putFields("api_id", Values.of(object.getId())) + .putFields("api_rank", Values.of(object.getRank()))) + .build(); + } + + @Override + protected String buildClassNameForChangeEvent(TestInternalObject object) { + return TestApiObject.class.getName(); + } + + @Override + protected String getContextFromData(TestInternalObject object) { return object.getId(); } @Override - protected List> orderFetchedObjects( - List> objects) { + protected List> orderFetchedObjects( + List> objects) { return objects.stream() .sorted(Comparator.comparing(object -> object.getData().getRank())) .collect(Collectors.toUnmodifiableList());