diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java index ae0cd6bd2c5b33..23ae35754c7fbe 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java @@ -36,6 +36,11 @@ private ApmTraceConstants() {} */ public static final class Tags { + /** + * Name of the APM trace tag that holds the attempt number value associated with the trace. + */ + public static final String ATTEMPT_NUMBER_KEY = "attempt_number"; + /** * Name of the APM trace tag that holds the destination Docker image value associated with the * trace. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 608c506300959a..167ae3ac7282b2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.check.connection; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -85,7 +86,8 @@ public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConf @Override public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) { ApmTraceUtils - .addTagsToTrace(Map.of(JOB_ID_KEY, args.getJobRunConfig().getJobId(), DOCKER_IMAGE_KEY, args.getLauncherConfig().getDockerImage())); + .addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, args.getJobRunConfig().getAttemptId(), JOB_ID_KEY, args.getJobRunConfig().getJobId(), + DOCKER_IMAGE_KEY, args.getLauncherConfig().getDockerImage())); final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration()); final StandardCheckConnectionInput input = new StandardCheckConnectionInput() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java index 5a00a44f6c11c1..57f3c441bc69c3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.check.connection; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; @@ -35,7 +36,8 @@ public class CheckConnectionWorkflowImpl implements CheckConnectionWorkflow { public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig, final StandardCheckConnectionInput connectionConfiguration) { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, + launcherConfig.getDockerImage())); final CheckConnectionInput checkInput = new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration); final int jobOutputVersion = diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 4626ffc1aebbd5..9ba130aed13129 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.discover.catalog; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -91,7 +92,8 @@ public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerC public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig, final StandardDiscoverCatalogInput config) { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, + launcherConfig.getDockerImage())); final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java index 127ed8162d96ed..ea5dab612fdee9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.discover.catalog; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; @@ -28,7 +29,8 @@ public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow { public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig, final StandardDiscoverCatalogInput config) { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, + launcherConfig.getDockerImage())); return activity.run(jobRunConfig, launcherConfig, config); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 17486aaa8b0a64..fc8de77ff4a7e1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; @@ -367,6 +368,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt(); final int attemptNumber = connectionUpdaterInput.getAttemptNumber(); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber)); final FailureType failureType = standardSyncOutput != null ? standardSyncOutput.getFailures().isEmpty() ? null : standardSyncOutput.getFailures().get(0).getFailureType() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index badc558e6dc8c5..de7c26da10f514 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.scheduling.activities; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; @@ -42,7 +43,7 @@ public GenerateInputActivityImpl(final JobPersistence jobPersistence) { @Override public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); final long jobId = input.getJobId(); final int attempt = input.getAttemptId(); final JobSyncConfig config; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 34026e1c864d9f..acadfb2bc98a19 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -6,6 +6,7 @@ import static io.airbyte.config.JobConfig.ConfigType.SYNC; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.persistence.job.models.AttemptStatus.FAILED; @@ -220,7 +221,7 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI @Override public void jobSuccess(final JobSuccessInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); final long jobId = input.getJobId(); final int attemptId = input.getAttemptId(); @@ -247,7 +248,8 @@ public void jobSuccess(final JobSuccessInput input) { @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber input) { - ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace( + Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); jobSuccess(new JobSuccessInput( input.getJobId(), input.getAttemptNumber(), @@ -267,7 +269,7 @@ public void jobFailure(final JobFailureInput input) { emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); final UUID connectionId = UUID.fromString(job.getScope()); - ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId)); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId)); final JobSyncConfig jobSyncConfig = job.getConfig().getSync(); final String sourceDockerImage = jobSyncConfig != null ? jobSyncConfig.getSourceDockerImage() : null; final String destinationDockerImage = jobSyncConfig != null ? jobSyncConfig.getDestinationDockerImage() : null; @@ -285,7 +287,7 @@ public void jobFailure(final JobFailureInput input) { @Override public void attemptFailure(final AttemptFailureInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); final int attemptId = input.getAttemptId(); final long jobId = input.getJobId(); @@ -313,7 +315,8 @@ public void attemptFailure(final AttemptFailureInput input) { @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput input) { - ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace( + Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); attemptFailure(new AttemptFailureInput( input.getJobId(), input.getAttemptNumber(), @@ -326,7 +329,7 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu @Override public void jobCancelled(final JobCancelledInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); final long jobId = input.getJobId(); final int attemptId = input.getAttemptId(); @@ -347,7 +350,8 @@ public void jobCancelled(final JobCancelledInput input) { @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumber input) { - ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); + ApmTraceUtils.addTagsToTrace( + Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId())); jobCancelled(new JobCancelledInput( input.getJobId(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index a80a1389cd52d1..340eafae4d45ce 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.spec; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -79,7 +80,8 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { - ApmTraceUtils.addTagsToTrace(Map.of(DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), JOB_ID_KEY, jobRunConfig.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), + JOB_ID_KEY, jobRunConfig.getJobId())); final Supplier inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java index a13493f56ae1db..5eb03717daebf4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.spec; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; @@ -25,7 +26,8 @@ public class SpecWorkflowImpl implements SpecWorkflow { @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { - ApmTraceUtils.addTagsToTrace(Map.of(DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), JOB_ID_KEY, jobRunConfig.getJobId())); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), + JOB_ID_KEY, jobRunConfig.getJobId())); return activity.run(jobRunConfig, launcherConfig); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index df06d31662c080..768f397ed86f3f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -95,7 +96,8 @@ public Void run(final JobRunConfig jobRunConfig, final ResourceRequirements resourceRequirements, final OperatorDbtInput input) { ApmTraceUtils.addTagsToTrace( - Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage())); + Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, + destinationLauncherConfig.getDockerImage())); final ActivityExecutionContext context = Activity.getExecutionContext(); return temporalUtils.withBackgroundHeartbeat( () -> { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 71d058051e0a20..b37e41e8aa33bd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -99,7 +100,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig destinationLauncherConfig, final NormalizationInput input) { ApmTraceUtils.addTagsToTrace( - Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage())); + Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, + destinationLauncherConfig.getDockerImage())); final ActivityExecutionContext context = Activity.getExecutionContext(); return temporalUtils.withBackgroundHeartbeat(() -> { final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index ad6a37961b069f..38bf7111eb25a4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; @@ -29,7 +30,7 @@ public class NormalizationSummaryCheckActivityImpl implements NormalizationSumma private final AirbyteApiClient airbyteApiClient; - public NormalizationSummaryCheckActivityImpl(AirbyteApiClient airbyteApiClient) { + public NormalizationSummaryCheckActivityImpl(final AirbyteApiClient airbyteApiClient) { this.airbyteApiClient = airbyteApiClient; } @@ -37,7 +38,7 @@ public NormalizationSummaryCheckActivityImpl(AirbyteApiClient airbyteApiClient) @Override @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional numCommittedRecords) { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId)); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId)); // if the count of committed records for this attempt is > 0 OR if it is null, // then we should run normalization @@ -48,7 +49,7 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList; try { AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)); - } catch (ApiException e) { + } catch (final ApiException e) { throw Activity.wrap(e); } final AtomicLong totalRecordsCommitted = new AtomicLong(0L); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 54bc24a4aa6f64..b88b16404388e2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_BYTES_SYNCED_KEY; @@ -137,8 +138,9 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig destinationLauncherConfig, final StandardSyncInput syncInput, @Nullable final String taskQueue) { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, - destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage())); + ApmTraceUtils + .addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, + destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage())); final ActivityExecutionContext context = Activity.getExecutionContext(); return temporalUtils.withBackgroundHeartbeat( () -> { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index d75dec375dc17a..945c0abe2bc78d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.sync; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; @@ -64,7 +65,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final UUID connectionId) { ApmTraceUtils - .addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId.toString(), JOB_ID_KEY, jobRunConfig.getJobId(), SOURCE_DOCKER_IMAGE_KEY, + .addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), CONNECTION_ID_KEY, connectionId.toString(), JOB_ID_KEY, + jobRunConfig.getJobId(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage())); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java index 8ec652f10da85b..fbdbbc330d5614 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java @@ -29,7 +29,7 @@ public class TemporalSdkInterceptor implements TraceInterceptor { /** * Error message tag key name that contains the Temporal exit error message. */ - static final String ERROR_MESSAGE_TAG_KEY = "error.message"; + static final String ERROR_MESSAGE_TAG_KEY = "error.msg"; /** * Temporal exit error message text. diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java index 811fef3bdd391d..6cb45216fcd844 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java @@ -32,7 +32,7 @@ void testOnTraceComplete() { final var otherError = new DummySpan(); otherError.setError(true); otherError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME); - otherError.setTag("error.message", "some other error"); + otherError.setTag(ERROR_MESSAGE_TAG_KEY, "some other error"); final var temporalExitMsgOperationNameError = new DummySpan(); temporalExitMsgOperationNameError.setError(true);