Skip to content

Commit

Permalink
Ensure removal of Temporal exit message from traces (#6522)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed May 11, 2023
1 parent ebe709e commit 4245303
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.interceptor.TraceInterceptor;
import io.micronaut.core.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -31,10 +33,20 @@ public class TemporalSdkInterceptor implements TraceInterceptor {
*/
static final String SYNC_WORKFLOW_IMPL_RESOURCE_NAME = "SyncWorkflowImpl.run";

/**
* The {@code error.msg} tag name.
*/
static final String ERROR_MSG_TAG = "error.msg";

/**
* The {@code error.message} tag name.
*/
static final String ERROR_MESSAGE_TAG = DDTags.ERROR_MSG;

/**
* Error message tag key name that contains the Temporal exit error message.
*/
static final String ERROR_MESSAGE_TAG_KEY = "error.msg";
static final Set<String> ERROR_MESSAGE_TAG_KEYS = Set.of(ERROR_MSG_TAG, ERROR_MESSAGE_TAG);

/**
* Temporal exit error message text.
Expand Down Expand Up @@ -83,7 +95,8 @@ boolean isExitTrace(final MutableSpan trace) {
}

return trace.isError()
&& EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString())
&& ERROR_MESSAGE_TAG_KEYS.stream().map(key -> trace.getTags().getOrDefault(key, "").toString())
.anyMatch(v -> EXIT_ERROR_MESSAGE.equalsIgnoreCase(v))
&& (safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME)
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME)
|| safeEquals(trace.getResourceName(), SYNC_WORKFLOW_IMPL_RESOURCE_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MSG_TAG;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.SYNC_WORKFLOW_IMPL_RESOURCE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -21,44 +22,108 @@
*/
class TemporalSdkInterceptorTest {

private static final String OTHER_OPERATION = "OtherOperation";
private static final String OTHER_RESOURCE = "OtherResource";
private static final String SOME_OTHER_ERROR = "some other error";
private static final String TAG = "tag";
private static final String VALUE = "value";

@Test
void testOnTraceCompleteErrorMsg() {
final var simple = new DummySpan();

final var noError = new DummySpan();
noError.setError(false);
noError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
noError.setTag(TAG, VALUE);

final var otherError = new DummySpan();
otherError.setError(true);
otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
otherError.setTag(ERROR_MSG_TAG, SOME_OTHER_ERROR);

final var temporalExitMsgOperationNameError = new DummySpan();
temporalExitMsgOperationNameError.setError(true);
temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalExitMsgOperationNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);

final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan();
connectionManagerTemporalExitMsgResourceNameError.setError(true);
connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);

final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan();
syncWorkflowTemporalExitMsgResourceNameError.setError(true);
syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherOperationError = new DummySpan();
temporalExitMsgOtherOperationError.setError(true);
temporalExitMsgOtherOperationError.setOperationName(OTHER_OPERATION);
temporalExitMsgOtherOperationError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherResourceError = new DummySpan();
temporalExitMsgOtherResourceError.setError(true);
temporalExitMsgOtherResourceError.setResourceName(OTHER_RESOURCE);
temporalExitMsgOtherResourceError.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);

final var spans = List.of(
simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError,
syncWorkflowTemporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
temporalExitMsgOtherResourceError);

final var interceptor = new TemporalSdkInterceptor();
final var actual = interceptor.onTraceComplete(spans);

assertEquals(spans, actual);
assertFalse(simple.isError());
assertFalse(noError.isError());
assertTrue(otherError.isError());
assertFalse(temporalExitMsgOperationNameError.isError());
assertFalse(connectionManagerTemporalExitMsgResourceNameError.isError());
assertFalse(syncWorkflowTemporalExitMsgResourceNameError.isError());
assertTrue(temporalExitMsgOtherOperationError.isError());
assertTrue(temporalExitMsgOtherResourceError.isError());
}

@Test
void testOnTraceComplete() {
void testOnTraceCompleteErrorMessage() {
final var simple = new DummySpan();

final var noError = new DummySpan();
noError.setError(false);
noError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
noError.setTag("tag", "value");
noError.setTag(TAG, VALUE);

final var otherError = new DummySpan();
otherError.setError(true);
otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
otherError.setTag(ERROR_MESSAGE_TAG_KEY, "some other error");
otherError.setTag(ERROR_MESSAGE_TAG, SOME_OTHER_ERROR);

final var temporalExitMsgOperationNameError = new DummySpan();
temporalExitMsgOperationNameError.setError(true);
temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);

final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan();
connectionManagerTemporalExitMsgResourceNameError.setError(true);
connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);

final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan();
syncWorkflowTemporalExitMsgResourceNameError.setError(true);
syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherOperationError = new DummySpan();
temporalExitMsgOtherOperationError.setError(true);
temporalExitMsgOtherOperationError.setOperationName("OtherOperation");
temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
temporalExitMsgOtherOperationError.setOperationName(OTHER_OPERATION);
temporalExitMsgOtherOperationError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherResourceError = new DummySpan();
temporalExitMsgOtherResourceError.setError(true);
temporalExitMsgOtherResourceError.setResourceName("OtherResource");
temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
temporalExitMsgOtherResourceError.setResourceName(OTHER_RESOURCE);
temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);

final var spans = List.of(
simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError,
Expand All @@ -80,7 +145,62 @@ void testOnTraceComplete() {
}

@Test
void testIsExitTrace() {
void testIsExitTraceErrorMsg() {
final var interceptor = new TemporalSdkInterceptor();

assertEquals(false, interceptor.isExitTrace(null));
assertEquals(false, interceptor.isExitTrace(new DummySpan()));

final var temporalTraceWithOperationName = new DummySpan();
temporalTraceWithOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithOperationName));

final var temporalTraceWithResourceName = new DummySpan();
temporalTraceWithResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithResourceName));

final var temporalTraceWithErrorAndOperationName = new DummySpan();
temporalTraceWithErrorAndOperationName.setError(true);
temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName));

final var temporalTraceWithErrorAndConnectionManagerResourceName = new DummySpan();
temporalTraceWithErrorAndConnectionManagerResourceName.setError(true);
temporalTraceWithErrorAndConnectionManagerResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndConnectionManagerResourceName));

final var temporalTraceWithErrorAndSyncWorkflowResourceName = new DummySpan();
temporalTraceWithErrorAndSyncWorkflowResourceName.setError(true);
temporalTraceWithErrorAndSyncWorkflowResourceName.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndSyncWorkflowResourceName));

final var temporalTraceWithExitErrorAndOperationName = new DummySpan();
temporalTraceWithExitErrorAndOperationName.setError(true);
temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName));

final var temporalTraceWithExitErrorAndResourceName = new DummySpan();
temporalTraceWithExitErrorAndResourceName.setError(true);
temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName));

final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true);
otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName(OTHER_OPERATION);
otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName));

final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true);
otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName(OTHER_RESOURCE);
otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MSG_TAG, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName));
}

@Test
void testIsExitTraceErrorMessage() {
final var interceptor = new TemporalSdkInterceptor();

assertEquals(false, interceptor.isExitTrace(null));
Expand Down Expand Up @@ -112,25 +232,25 @@ void testIsExitTrace() {
final var temporalTraceWithExitErrorAndOperationName = new DummySpan();
temporalTraceWithExitErrorAndOperationName.setError(true);
temporalTraceWithExitErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
temporalTraceWithExitErrorAndOperationName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndOperationName));

final var temporalTraceWithExitErrorAndResourceName = new DummySpan();
temporalTraceWithExitErrorAndResourceName.setError(true);
temporalTraceWithExitErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
temporalTraceWithExitErrorAndResourceName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);
assertEquals(true, interceptor.isExitTrace(temporalTraceWithExitErrorAndResourceName));

final var otherTemporalTraceWithExitErrorAndOtherOperationName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherOperationName.setError(true);
otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName("OtherOperation");
otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
otherTemporalTraceWithExitErrorAndOtherOperationName.setOperationName(OTHER_OPERATION);
otherTemporalTraceWithExitErrorAndOtherOperationName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherOperationName));

final var otherTemporalTraceWithExitErrorAndOtherResourceName = new DummySpan();
otherTemporalTraceWithExitErrorAndOtherResourceName.setError(true);
otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName("OtherResource");
otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
otherTemporalTraceWithExitErrorAndOtherResourceName.setResourceName(OTHER_RESOURCE);
otherTemporalTraceWithExitErrorAndOtherResourceName.setTag(ERROR_MESSAGE_TAG, EXIT_ERROR_MESSAGE);
assertEquals(false, interceptor.isExitTrace(otherTemporalTraceWithExitErrorAndOtherResourceName));
}

Expand Down

0 comments on commit 4245303

Please sign in to comment.