Skip to content

Commit

Permalink
Add AttemptContext (#6399)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed May 8, 2023
1 parent 70df394 commit 1305bc9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 20 deletions.
1 change: 1 addition & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
testImplementation libs.bundles.micronaut.test
testImplementation libs.json.path
testImplementation libs.mockito.inline
testImplementation(variantOf(libs.opentracing.util.test) { classifier('tests') })
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers.postgresql
testImplementation libs.jmh.core
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.context;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import io.airbyte.metrics.lib.ApmTraceUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
* Context of an Attempt.
*/
public record AttemptContext(UUID connectionId, Long jobId, Integer attemptNumber) {

/**
* Update the current trace with the ids from the context.
*/
public void addTagsToTrace() {
final Map<String, Object> tags = new HashMap<>();
if (connectionId != null) {
tags.put(CONNECTION_ID_KEY, connectionId);
}
if (jobId != null) {
tags.put(JOB_ID_KEY, jobId);
}
if (attemptNumber != null) {
tags.put(ATTEMPT_NUMBER_KEY, attemptNumber);
}
ApmTraceUtils.addTagsToTrace(tags);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.context;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceUtils.formatTag;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracerTestUtil;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AttemptContextTest {

Span span;
Tracer tracer;

@BeforeEach
void beforeEach() {
span = mock(Span.class);
tracer = mock(Tracer.class);
when(tracer.activeSpan()).thenReturn(span);
GlobalTracerTestUtil.setGlobalTracerUnconditionally(tracer);
}

@Test
void addTagsToTrace() {
final UUID connectionId = UUID.randomUUID();
final Long jobId = 5L;
final Integer attemptNumber = 3;
new AttemptContext(connectionId, jobId, attemptNumber).addTagsToTrace();

verifySpanSetTag(CONNECTION_ID_KEY, connectionId);
verifySpanSetTag(JOB_ID_KEY, jobId);
verifySpanSetTag(ATTEMPT_NUMBER_KEY, attemptNumber);
}

@Test
void addTagsToTraceShouldIgnoreNullValues() {
final UUID connectionId = UUID.randomUUID();
final Long jobId = 3L;
final Integer attemptNumber = 7;

new AttemptContext(connectionId, null, null).addTagsToTrace();
verifySpanSetTag(CONNECTION_ID_KEY, connectionId);
reset(span);

new AttemptContext(null, jobId, null).addTagsToTrace();
verifySpanSetTag(JOB_ID_KEY, jobId);
reset(span);

new AttemptContext(null, null, attemptNumber).addTagsToTrace();
verifySpanSetTag(ATTEMPT_NUMBER_KEY, attemptNumber);
reset(span);
}

private void verifySpanSetTag(final String tag, final Object value) {
verify(span).setTag(formatTag(tag), value.toString());
}

}
2 changes: 1 addition & 1 deletion airbyte-metrics/metrics-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
testImplementation project(':airbyte-config:config-persistence')
testImplementation project(':airbyte-test-utils')
testImplementation libs.platform.testcontainers.postgresql
testImplementation "io.opentracing:opentracing-util:0.33.0:tests"
testImplementation(variantOf(libs.opentracing.util.test) { classifier('tests') })
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ public enum OssMetricsRegistry implements MetricsRegistry {
SCHEMA_CHANGE_AUTO_PROPAGATED(MetricEmittingApps.SERVER,
"schema_change_auto_propagated",
"a schema change have been propagated",
MetricTags.CONNECTION_ID);
MetricTags.CONNECTION_ID),

INCONSISTENT_ACTIVITY_INPUT(MetricEmittingApps.WORKER,
"inconsistent_activity_input",
"whenever we detect a mismatch between the input and the actual config");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

import static io.airbyte.config.JobConfig.ConfigType.SYNC;
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_TYPES_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -63,6 +60,7 @@
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.context.AttemptContext;
import io.airbyte.workers.helper.FailureHelper;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
Expand Down Expand Up @@ -169,9 +167,9 @@ private static String parseAttemptNumberOrNull(final int attemptNumber) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public JobCreationOutput createNewJob(final JobCreationInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));
new AttemptContext(input.getConnectionId(), null, null).addTagsToTrace();

try {
// Fail non-terminal jobs first to prevent this activity from repeatedly trying to create a new job
// and failing, potentially resulting in the workflow ending up in a quarantined state.
// Another non-terminal job is not expected to exist at this point in the normal case, but this
Expand Down Expand Up @@ -244,9 +242,10 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException {
new AttemptContext(null, input.getJobId(), null).addTagsToTrace();

try {
final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job job = jobPersistence.getJob(jobId);

final Path jobRoot = TemporalUtils.getJobRoot(workspaceRoot, String.valueOf(jobId), job.getAttemptsCount());
Expand All @@ -265,12 +264,11 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
new AttemptContext(input.getConnectionId(), input.getJobId(), input.getAttemptNumber()).addTagsToTrace();

try {
final long jobId = input.getJobId();
final int attemptNumber = input.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
Expand All @@ -294,6 +292,8 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobFailure(final JobFailureInput input) {
new AttemptContext(input.getConnectionId(), input.getJobId(), input.getAttemptNumber()).addTagsToTrace();

try {
final long jobId = input.getJobId();
jobPersistence.failJob(jobId);
Expand All @@ -303,7 +303,11 @@ public void jobFailure(final JobFailureInput input) {
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId);

final UUID connectionId = UUID.fromString(job.getScope());
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
if (!connectionId.equals(input.getConnectionId())) {
log.warn("inconsistent connectionId for jobId '{}' (input:'{}', db:'{}')", jobId, input.getConnectionId(), connectionId);
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.INCONSISTENT_ACTIVITY_INPUT, 1);
}

final JobSyncConfig jobSyncConfig = job.getConfig().getSync();
final String sourceDockerImage;
final String destinationDockerImage;
Expand All @@ -329,15 +333,13 @@ public void jobFailure(final JobFailureInput input) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
new AttemptContext(input.getConnectionId(), input.getJobId(), input.getAttemptNumber()).addTagsToTrace();

try {
final int attemptNumber = input.getAttemptNumber();
final long jobId = input.getJobId();
final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();

ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));
traceFailures(failureSummary);

jobPersistence.failAttempt(jobId, attemptNumber);
Expand All @@ -358,13 +360,11 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
new AttemptContext(input.getConnectionId(), input.getJobId(), input.getAttemptNumber()).addTagsToTrace();

try {
final long jobId = input.getJobId();
final int attemptNumber = input.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));
jobPersistence.failAttempt(jobId, attemptNumber);
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, input.getAttemptFailureSummary());
jobPersistence.cancelJob(jobId);
Expand All @@ -382,8 +382,9 @@ public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumb
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void reportJobStart(final ReportJobStartInput input) {
new AttemptContext(input.getConnectionId(), input.getJobId(), null).addTagsToTrace();

try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
final Job job = jobPersistence.getJob(input.getJobId());
jobTracker.trackSync(job, JobState.STARTED);
} catch (final IOException e) {
Expand All @@ -394,7 +395,7 @@ public void reportJobStart(final ReportJobStartInput input) {
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void ensureCleanJobState(final EnsureCleanJobStateInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));
new AttemptContext(input.getConnectionId(), null, null).addTagsToTrace();
failNonTerminalJobs(input.getConnectionId());
}

Expand Down
1 change: 1 addition & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ mockk = { module = "io.mockk:mockk", version = "1.13.3" }
mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp3" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp3" }
openapi-jackson-databind-nullable = { module = "org.openapitools:jackson-databind-nullable", version = "0.2.5" }
opentracing-util-test = { module = "io.opentracing:opentracing-util", version = "0.33.0" }
otel-bom = { module = "io.opentelemetry:opentelemetry-bom", version = "1.14.0" }
otel-sdk = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version = "1.14.0" }
otel-sdk-testing = { module = "io.opentelemetry:opentelemetry-sdk-metrics-testing", version = "1.13.0-alpha" }
Expand Down

0 comments on commit 1305bc9

Please sign in to comment.