Skip to content

Commit

Permalink
Add attempt number to trace tags (#19241)
Browse files Browse the repository at this point in the history
* Add attempt number to trace tags

* Use correct tag key for error suppression
  • Loading branch information
jdpgrailsdev committed Nov 9, 2022
1 parent e029353 commit 0e036db
Show file tree
Hide file tree
Showing 17 changed files with 57 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ private ApmTraceConstants() {}
*/
public static final class Tags {

/**
* Name of the APM trace tag that holds the attempt number value associated with the trace.
*/
public static final String ATTEMPT_NUMBER_KEY = "attempt_number";

/**
* Name of the APM trace tag that holds the destination Docker image value associated with the
* trace.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.check.connection;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

Expand Down Expand Up @@ -85,7 +86,8 @@ public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConf
@Override
public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) {
ApmTraceUtils
.addTagsToTrace(Map.of(JOB_ID_KEY, args.getJobRunConfig().getJobId(), DOCKER_IMAGE_KEY, args.getLauncherConfig().getDockerImage()));
.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, args.getJobRunConfig().getAttemptId(), JOB_ID_KEY, args.getJobRunConfig().getJobId(),
DOCKER_IMAGE_KEY, args.getLauncherConfig().getDockerImage()));
final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration());

final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.check.connection;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
Expand Down Expand Up @@ -35,7 +36,8 @@ public class CheckConnectionWorkflowImpl implements CheckConnectionWorkflow {
public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig launcherConfig,
final StandardCheckConnectionInput connectionConfiguration) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY,
launcherConfig.getDockerImage()));
final CheckConnectionInput checkInput = new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration);

final int jobOutputVersion =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.discover.catalog;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

Expand Down Expand Up @@ -91,7 +92,8 @@ public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerC
public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig launcherConfig,
final StandardDiscoverCatalogInput config) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY,
launcherConfig.getDockerImage()));
final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration());

final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.discover.catalog;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
Expand All @@ -28,7 +29,8 @@ public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow {
public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig launcherConfig,
final StandardDiscoverCatalogInput config) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DOCKER_IMAGE_KEY,
launcherConfig.getDockerImage()));
return activity.run(jobRunConfig, launcherConfig, config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
Expand Down Expand Up @@ -367,6 +368,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber));

final FailureType failureType =
standardSyncOutput != null ? standardSyncOutput.getFailures().isEmpty() ? null : standardSyncOutput.getFailures().get(0).getFailureType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling.activities;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
Expand Down Expand Up @@ -42,7 +43,7 @@ public GenerateInputActivityImpl(final JobPersistence jobPersistence) {
@Override
public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));
final long jobId = input.getJobId();
final int attempt = input.getAttemptId();
final JobSyncConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.config.JobConfig.ConfigType.SYNC;
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;
Expand Down Expand Up @@ -220,7 +221,7 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI
@Override
public void jobSuccess(final JobSuccessInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
Expand All @@ -247,7 +248,8 @@ public void jobSuccess(final JobSuccessInput input) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
jobSuccess(new JobSuccessInput(
input.getJobId(),
input.getAttemptNumber(),
Expand All @@ -267,7 +269,7 @@ public void jobFailure(final JobFailureInput input) {
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId);

final UUID connectionId = UUID.fromString(job.getScope());
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
final JobSyncConfig jobSyncConfig = job.getConfig().getSync();
final String sourceDockerImage = jobSyncConfig != null ? jobSyncConfig.getSourceDockerImage() : null;
final String destinationDockerImage = jobSyncConfig != null ? jobSyncConfig.getDestinationDockerImage() : null;
Expand All @@ -285,7 +287,7 @@ public void jobFailure(final JobFailureInput input) {
@Override
public void attemptFailure(final AttemptFailureInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final int attemptId = input.getAttemptId();
final long jobId = input.getJobId();
Expand Down Expand Up @@ -313,7 +315,8 @@ public void attemptFailure(final AttemptFailureInput input) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
attemptFailure(new AttemptFailureInput(
input.getJobId(),
input.getAttemptNumber(),
Expand All @@ -326,7 +329,7 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
@Override
public void jobCancelled(final JobCancelledInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
Expand All @@ -347,7 +350,8 @@ public void jobCancelled(final JobCancelledInput input) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));

jobCancelled(new JobCancelledInput(
input.getJobId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.spec;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

Expand Down Expand Up @@ -79,7 +80,8 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) {
ApmTraceUtils.addTagsToTrace(Map.of(DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), JOB_ID_KEY, jobRunConfig.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(),
JOB_ID_KEY, jobRunConfig.getJobId()));

final Supplier<JobGetSpecConfig> inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.spec;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;
Expand All @@ -25,7 +26,8 @@ public class SpecWorkflowImpl implements SpecWorkflow {
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) {
ApmTraceUtils.addTagsToTrace(Map.of(DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(), JOB_ID_KEY, jobRunConfig.getJobId()));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), DOCKER_IMAGE_KEY, launcherConfig.getDockerImage(),
JOB_ID_KEY, jobRunConfig.getJobId()));
return activity.run(jobRunConfig, launcherConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

Expand Down Expand Up @@ -95,7 +96,8 @@ public Void run(final JobRunConfig jobRunConfig,
final ResourceRequirements resourceRequirements,
final OperatorDbtInput input) {
ApmTraceUtils.addTagsToTrace(
Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage()));
Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage()));
final ActivityExecutionContext context = Activity.getExecutionContext();
return temporalUtils.withBackgroundHeartbeat(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

Expand Down Expand Up @@ -99,7 +100,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final NormalizationInput input) {
ApmTraceUtils.addTagsToTrace(
Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage()));
Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage()));
final ActivityExecutionContext context = Activity.getExecutionContext();
return temporalUtils.withBackgroundHeartbeat(() -> {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
Expand All @@ -29,15 +30,15 @@ public class NormalizationSummaryCheckActivityImpl implements NormalizationSumma

private final AirbyteApiClient airbyteApiClient;

public NormalizationSummaryCheckActivityImpl(AirbyteApiClient airbyteApiClient) {
public NormalizationSummaryCheckActivityImpl(final AirbyteApiClient airbyteApiClient) {
this.airbyteApiClient = airbyteApiClient;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));

// if the count of committed records for this attempt is > 0 OR if it is null,
// then we should run normalization
Expand All @@ -48,7 +49,7 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber
final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList;
try {
AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId));
} catch (ApiException e) {
} catch (final ApiException e) {
throw Activity.wrap(e);
}
final AtomicLong totalRecordsCommitted = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_BYTES_SYNCED_KEY;
Expand Down Expand Up @@ -137,8 +138,9 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
@Nullable final String taskQueue) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()));
ApmTraceUtils
.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()));
final ActivityExecutionContext context = Activity.getExecutionContext();
return temporalUtils.withBackgroundHeartbeat(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.sync;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
Expand Down Expand Up @@ -64,7 +65,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final UUID connectionId) {

ApmTraceUtils
.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId.toString(), JOB_ID_KEY, jobRunConfig.getJobId(), SOURCE_DOCKER_IMAGE_KEY,
.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), CONNECTION_ID_KEY, connectionId.toString(), JOB_ID_KEY,
jobRunConfig.getJobId(), SOURCE_DOCKER_IMAGE_KEY,
sourceLauncherConfig.getDockerImage(),
DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage()));

Expand Down
Loading

0 comments on commit 0e036db

Please sign in to comment.