From 424530343a611a26b6eed3a984962e5b71dd72ef Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Thu, 11 May 2023 18:22:27 -0400 Subject: [PATCH] Ensure removal of Temporal exit message from traces (#6522) --- .../tracing/TemporalSdkInterceptor.java | 17 +- .../tracing/TemporalSdkInterceptorTest.java | 156 ++++++++++++++++-- 2 files changed, 153 insertions(+), 20 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java index f975aa165f5..d8009a0f8e6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java @@ -7,11 +7,13 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import com.google.common.annotations.VisibleForTesting; +import datadog.trace.api.DDTags; import datadog.trace.api.interceptor.MutableSpan; import datadog.trace.api.interceptor.TraceInterceptor; import io.micronaut.core.util.CollectionUtils; import java.util.ArrayList; import java.util.Collection; +import java.util.Set; import lombok.extern.slf4j.Slf4j; /** @@ -31,10 +33,20 @@ public class TemporalSdkInterceptor implements TraceInterceptor { */ static final String SYNC_WORKFLOW_IMPL_RESOURCE_NAME = "SyncWorkflowImpl.run"; + /** + * The {@code error.msg} tag name. + */ + static final String ERROR_MSG_TAG = "error.msg"; + + /** + * The {@code error.message} tag name. + */ + static final String ERROR_MESSAGE_TAG = DDTags.ERROR_MSG; + /** * Error message tag key name that contains the Temporal exit error message. */ - static final String ERROR_MESSAGE_TAG_KEY = "error.msg"; + static final Set ERROR_MESSAGE_TAG_KEYS = Set.of(ERROR_MSG_TAG, ERROR_MESSAGE_TAG); /** * Temporal exit error message text. @@ -83,7 +95,8 @@ boolean isExitTrace(final MutableSpan trace) { } return trace.isError() - && EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString()) + && ERROR_MESSAGE_TAG_KEYS.stream().map(key -> trace.getTags().getOrDefault(key, "").toString()) + .anyMatch(v -> EXIT_ERROR_MESSAGE.equalsIgnoreCase(v)) && (safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME) || safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME) || safeEquals(trace.getResourceName(), SYNC_WORKFLOW_IMPL_RESOURCE_NAME)); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java index b2c9ed1bf76..f30efbd7e97 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java @@ -6,7 +6,8 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME; -import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY; +import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG; +import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MSG_TAG; import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE; import static io.airbyte.workers.tracing.TemporalSdkInterceptor.SYNC_WORKFLOW_IMPL_RESOURCE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -21,44 +22,108 @@ */ class TemporalSdkInterceptorTest { + private static final String OTHER_OPERATION = "OtherOperation"; + private static final String OTHER_RESOURCE = "OtherResource"; + private static final String SOME_OTHER_ERROR = "some other error"; + private static final String TAG = "tag"; + private static final String VALUE = "value"; + + @Test + void testOnTraceCompleteErrorMsg() { + final var simple = new DummySpan(); + + final var noError = new DummySpan(); + noError.setError(false); + noError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + noError.setTag(TAG, VALUE); + + final var otherError = new DummySpan(); + otherError.setError(true); + otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + otherError.setTag(ERROR_MSG_TAG, SOME_OTHER_ERROR); + + final var temporalExitMsgOperationNameError = new DummySpan(); + temporalExitMsgOperationNameError.setError(true); + temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + temporalExitMsgOperationNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + + final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan(); + connectionManagerTemporalExitMsgResourceNameError.setError(true); + connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + + final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan(); + syncWorkflowTemporalExitMsgResourceNameError.setError(true); + syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME); + syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + + final var temporalExitMsgOtherOperationError = new DummySpan(); + temporalExitMsgOtherOperationError.setError(true); + temporalExitMsgOtherOperationError.setOperationName(OTHER_OPERATION); + temporalExitMsgOtherOperationError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + + final var temporalExitMsgOtherResourceError = new DummySpan(); + temporalExitMsgOtherResourceError.setError(true); + temporalExitMsgOtherResourceError.setResourceName(OTHER_RESOURCE); + temporalExitMsgOtherResourceError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + + final var spans = List.of( + simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError, + syncWorkflowTemporalExitMsgResourceNameError, temporalExitMsgOtherOperationError, + temporalExitMsgOtherResourceError); + + final var interceptor = new TemporalSdkInterceptor(); + final var actual = interceptor.onTraceComplete(spans); + + assertEquals(spans, actual); + assertFalse(simple.isError()); + assertFalse(noError.isError()); + assertTrue(otherError.isError()); + assertFalse(temporalExitMsgOperationNameError.isError()); + assertFalse(connectionManagerTemporalExitMsgResourceNameError.isError()); + assertFalse(syncWorkflowTemporalExitMsgResourceNameError.isError()); + assertTrue(temporalExitMsgOtherOperationError.isError()); + assertTrue(temporalExitMsgOtherResourceError.isError()); + } + @Test - void testOnTraceComplete() { + void testOnTraceCompleteErrorMessage() { final var simple = new DummySpan(); final var noError = new DummySpan(); noError.setError(false); noError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - noError.setTag("tag", "value"); + noError.setTag(TAG, VALUE); final var otherError = new DummySpan(); otherError.setError(true); otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - otherError.setTag(ERROR_MESSAGE_TAG_KEY, "some other error"); + otherError.setTag(ERROR_MESSAGE_TAG, SOME_OTHER_ERROR); final var temporalExitMsgOperationNameError = new DummySpan(); temporalExitMsgOperationNameError.setError(true); temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan(); connectionManagerTemporalExitMsgResourceNameError.setError(true); connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); - connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan(); syncWorkflowTemporalExitMsgResourceNameError.setError(true); syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME); - syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); final var temporalExitMsgOtherOperationError = new DummySpan(); temporalExitMsgOtherOperationError.setError(true); - temporalExitMsgOtherOperationError.setOperationName("OtherOperation"); - temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + temporalExitMsgOtherOperationError.setOperationName(OTHER_OPERATION); + temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); final var temporalExitMsgOtherResourceError = new DummySpan(); temporalExitMsgOtherResourceError.setError(true); - temporalExitMsgOtherResourceError.setResourceName("OtherResource"); - temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + temporalExitMsgOtherResourceError.setResourceName(OTHER_RESOURCE); + temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); final var spans = List.of( simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError, @@ -80,7 +145,62 @@ void testOnTraceComplete() { } @Test - void testIsExitTrace() { + void testIsExitTraceErrorMsg() { + final var interceptor = new TemporalSdkInterceptor(); + + assertEquals(false, interceptor.isExitTrace(null)); + assertEquals(false, interceptor.isExitTrace(new DummySpan())); + + final var temporalTraceWithOperationName = new DummySpan(); + temporalTraceWithOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithOperationName)); + + final var temporalTraceWithResourceName = new DummySpan(); + temporalTraceWithResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithResourceName)); + + final var temporalTraceWithErrorAndOperationName = new DummySpan(); + temporalTraceWithErrorAndOperationName.setError(true); + temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName)); + + final var temporalTraceWithErrorAndConnectionManagerResourceName = new DummySpan(); + temporalTraceWithErrorAndConnectionManagerResourceName.setError(true); + temporalTraceWithErrorAndConnectionManagerResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndConnectionManagerResourceName)); + + final var temporalTraceWithErrorAndSyncWorkflowResourceName = new DummySpan(); + temporalTraceWithErrorAndSyncWorkflowResourceName.setError(true); + temporalTraceWithErrorAndSyncWorkflowResourceName.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME); + assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndSyncWorkflowResourceName)); + + final var temporalTraceWithExitErrorAndOperationName = new DummySpan(); + temporalTraceWithExitErrorAndOperationName.setError(true); + temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); + temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName)); + + final var temporalTraceWithExitErrorAndResourceName = new DummySpan(); + temporalTraceWithExitErrorAndResourceName.setError(true); + temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); + temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName)); + + final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan(); + otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true); + otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName(OTHER_OPERATION); + otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName)); + + final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan(); + otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true); + otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName(OTHER_RESOURCE); + otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE); + assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName)); + } + + @Test + void testIsExitTraceErrorMessage() { final var interceptor = new TemporalSdkInterceptor(); assertEquals(false, interceptor.isExitTrace(null)); @@ -112,25 +232,25 @@ void testIsExitTrace() { final var temporalTraceWithExitErrorAndOperationName = new DummySpan(); temporalTraceWithExitErrorAndOperationName.setError(true); temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName)); final var temporalTraceWithExitErrorAndResourceName = new DummySpan(); temporalTraceWithExitErrorAndResourceName.setError(true); temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME); - temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName)); final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan(); otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true); - otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName("OtherOperation"); - otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName(OTHER_OPERATION); + otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName)); final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan(); otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true); - otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName("OtherResource"); - otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE); + otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName(OTHER_RESOURCE); + otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE); assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName)); }