Skip to content

Commit

Permalink
Add AttemptFailureSummary to API response (#10022)
Browse files Browse the repository at this point in the history
* add attempt failure info to api yml

* populate failureSummary in JobConverter

* tests and formatting

* add Partial Success to glossary section of docs

* failure summary naming/enum tweaks

* tweak enum values and add retryable boolean instead of transient failureType
  • Loading branch information
pmossman committed Feb 4, 2022
1 parent 2eb73c5 commit 01f4675
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 17 deletions.
50 changes: 50 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3204,6 +3204,8 @@ components:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
failureSummary:
$ref: "#/components/schemas/AttemptFailureSummary"
AttemptStats:
type: object
properties:
Expand All @@ -3229,6 +3231,54 @@ components:
type: string
stats:
$ref: "#/components/schemas/AttemptStats"
AttemptFailureSummary:
type: object
required:
- failures
properties:
failures:
type: array
items:
$ref: "#/components/schemas/AttemptFailureReason"
partialSuccess:
description: True if the number of committed records for this attempt was greater than 0. False if 0 records were committed. If not set, the number of committed records is unknown.
type: boolean
AttemptFailureReason:
type: object
required:
- timestamp
properties:
failureOrigin:
$ref: "#/components/schemas/AttemptFailureOrigin"
failureType:
$ref: "#/components/schemas/AttemptFailureType"
externalMessage:
type: string
stacktrace:
type: string
retryable:
description: True if it is known that retrying may succeed, e.g. for a transient failure. False if it is known that a retry will not succeed, e.g. for a configuration issue. If not set, retryable status is not well known.
type: boolean
timestamp:
type: integer
format: int64
AttemptFailureOrigin:
description: Indicates where the error originated. If not set, the origin of error is not well known.
type: string
enum:
- source
- destination
- replication
- persistence
- normalization
- dbt
AttemptFailureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
enum:
- config_error
- system_error
- manual_cancellation
AttemptStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ properties:
items:
"$ref": FailureReason.yaml
partialSuccess:
description: True if the number of committed records for this attempt was greater than 0. False if 0 records were committed.
description: True if the number of committed records for this attempt was greater than 0. False if 0 records were committed. Blank if number of committed records is unknown.
type: boolean
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,26 @@
title: FailureSummary
type: object
required:
- failureOrigin
- timestamp
additionalProperties: false
properties:
failureOrigin:
description: Indicates where the error originated. If not set, the origin of error is not well known.
type: string
enum:
- unknown
- source
- destination
- replicationWorker
- replication
- persistence
- normalization
- dbt
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
enum:
- unknown
- userError
- configError
- systemError
- transient
- manualCancellation
internalMessage:
description: Human readable failure description for consumption by technical system operators, like Airbyte engineers or OSS users.
type: string
Expand All @@ -40,5 +37,8 @@ properties:
stacktrace:
description: Raw stacktrace associated with the failure.
type: string
retryable:
description: True if it is known that retrying may succeed, e.g. for a transient failure. False if it is known that a retry will not succeed, e.g. for a configuration issue. If not set, retryable status is not well known.
type: boolean
timestamp:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.server.converters;

import io.airbyte.api.model.AttemptFailureOrigin;
import io.airbyte.api.model.AttemptFailureReason;
import io.airbyte.api.model.AttemptFailureSummary;
import io.airbyte.api.model.AttemptFailureType;
import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
Expand Down Expand Up @@ -110,10 +114,11 @@ public static AttemptRead getAttemptRead(final Attempt attempt) {
.streamStats(getAttemptStreamStats(attempt))
.createdAt(attempt.getCreatedAtInSecond())
.updatedAt(attempt.getUpdatedAtInSecond())
.endedAt(attempt.getEndedAtInSecond().orElse(null));
.endedAt(attempt.getEndedAtInSecond().orElse(null))
.failureSummary(getAttemptFailureSummary(attempt));
}

public static AttemptStats getTotalAttemptStats(final Attempt attempt) {
private static AttemptStats getTotalAttemptStats(final Attempt attempt) {
final SyncStats totalStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
Expand All @@ -131,7 +136,7 @@ public static AttemptStats getTotalAttemptStats(final Attempt attempt) {
.recordsCommitted(totalStats.getRecordsCommitted());
}

public static List<AttemptStreamStats> getAttemptStreamStats(final Attempt attempt) {
private static List<AttemptStreamStats> getAttemptStreamStats(final Attempt attempt) {
final List<StreamSyncStats> streamStats = attempt.getOutput()
.map(JobOutput::getSync)
.map(StandardSyncOutput::getStandardSyncSummary)
Expand All @@ -149,6 +154,25 @@ public static List<AttemptStreamStats> getAttemptStreamStats(final Attempt attem
.collect(Collectors.toList());
}

private static AttemptFailureSummary getAttemptFailureSummary(final Attempt attempt) {
final io.airbyte.config.AttemptFailureSummary failureSummary = attempt.getFailureSummary().orElse(null);

if (failureSummary == null) {
return null;
}

return new AttemptFailureSummary()
.failures(failureSummary.getFailures().stream().map(failure -> new AttemptFailureReason()
.failureOrigin(Enums.convertTo(failure.getFailureOrigin(), AttemptFailureOrigin.class))
.failureType(Enums.convertTo(failure.getFailureType(), AttemptFailureType.class))
.externalMessage(failure.getExternalMessage())
.stacktrace(failure.getStacktrace())
.timestamp(failure.getTimestamp())
.retryable(failure.getRetryable()))
.collect(Collectors.toList()))
.partialSuccess(failureSummary.getPartialSuccess());
}

public LogRead getLogRead(final Path logPath) {
try {
return new LogRead().logLines(LogClientSingleton.getInstance().getJobLogFile(workerEnvironment, logConfigs, logPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,28 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.Lists;
import io.airbyte.api.model.*;
import io.airbyte.api.model.AttemptFailureOrigin;
import io.airbyte.api.model.AttemptFailureReason;
import io.airbyte.api.model.AttemptFailureSummary;
import io.airbyte.api.model.AttemptFailureType;
import io.airbyte.api.model.AttemptInfoRead;
import io.airbyte.api.model.AttemptRead;
import io.airbyte.api.model.AttemptStats;
import io.airbyte.api.model.AttemptStreamStats;
import io.airbyte.api.model.DestinationDefinitionRead;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobDebugRead;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobRead;
import io.airbyte.api.model.JobWithAttemptsRead;
import io.airbyte.api.model.LogRead;
import io.airbyte.api.model.SourceDefinitionRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
Expand Down Expand Up @@ -53,6 +71,10 @@ class JobConverterTest {
private static final long RECORDS_COMMITTED = 10L;
private static final long STATE_MESSAGES_EMITTED = 2L;
private static final String STREAM_NAME = "stream1";
private static final String FAILURE_EXTERNAL_MESSAGE = "something went wrong";
private static final long FAILURE_TIMESTAMP = System.currentTimeMillis();
private static final String FAILURE_STACKTRACE = "stacktrace";
private static final boolean PARTIAL_SUCCESS = false;

private static final JobOutput JOB_OUTPUT = new JobOutput()
.withOutputType(OutputType.SYNC)
Expand Down Expand Up @@ -105,7 +127,15 @@ class JobConverterTest {
.recordsCommitted(RECORDS_COMMITTED))))
.updatedAt(CREATED_AT)
.createdAt(CREATED_AT)
.endedAt(CREATED_AT))
.endedAt(CREATED_AT)
.failureSummary(new AttemptFailureSummary()
.failures(Lists.newArrayList(new AttemptFailureReason()
.failureOrigin(AttemptFailureOrigin.SOURCE)
.failureType(AttemptFailureType.SYSTEM_ERROR)
.externalMessage(FAILURE_EXTERNAL_MESSAGE)
.stacktrace(FAILURE_STACKTRACE)
.timestamp(FAILURE_TIMESTAMP)))
.partialSuccess(PARTIAL_SUCCESS)))
.logs(new LogRead().logLines(new ArrayList<>()))));

private static final String version = "0.33.4";
Expand All @@ -128,6 +158,15 @@ class JobConverterTest {
.job(JOB_INFO.getJob())
.attempts(JOB_INFO.getAttempts().stream().map(AttemptInfoRead::getAttempt).collect(Collectors.toList()));

private static final io.airbyte.config.AttemptFailureSummary FAILURE_SUMMARY = new io.airbyte.config.AttemptFailureSummary()
.withFailures(Lists.newArrayList(new FailureReason()
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.SYSTEM_ERROR)
.withExternalMessage(FAILURE_EXTERNAL_MESSAGE)
.withStacktrace(FAILURE_STACKTRACE)
.withTimestamp(FAILURE_TIMESTAMP)))
.withPartialSuccess(PARTIAL_SUCCESS);

@BeforeEach
public void setUp() {
jobConverter = new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY);
Expand All @@ -148,6 +187,7 @@ public void setUp() {
when(attempt.getCreatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getUpdatedAtInSecond()).thenReturn(CREATED_AT);
when(attempt.getEndedAtInSecond()).thenReturn(Optional.of(CREATED_AT));
when(attempt.getFailureSummary()).thenReturn(Optional.of(FAILURE_SUMMARY));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public static FailureReason destinationFailure(final Throwable t, final Long job
.withExternalMessage("Something went wrong within the destination connector");
}

public static FailureReason replicationWorkerFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
public static FailureReason replicationFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.REPLICATION_WORKER)
.withFailureOrigin(FailureOrigin.REPLICATION)
.withExternalMessage("Something went wrong during replication");
}

Expand All @@ -73,7 +73,6 @@ public static FailureReason dbtFailure(final Throwable t, final Long jobId, fina

public static FailureReason unknownOriginFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.UNKNOWN)
.withExternalMessage("An unknown failure occurred");
}

Expand All @@ -89,7 +88,7 @@ public static FailureReason failureReasonFromWorkflowAndActivity(final String wo
final Long jobId,
final Integer attemptNumber) {
if (workflowType.equals(WORKFLOW_TYPE_SYNC) && activityType.equals(ACTIVITY_TYPE_REPLICATE)) {
return replicationWorkerFailure(t, jobId, attemptNumber);
return replicationFailure(t, jobId, attemptNumber);
} else if (workflowType.equals(WORKFLOW_TYPE_SYNC) && activityType.equals(ACTIVITY_TYPE_PERSIST)) {
return persistenceFailure(t, jobId, attemptNumber);
} else if (workflowType.equals(WORKFLOW_TYPE_SYNC) && activityType.equals(ACTIVITY_TYPE_NORMALIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ public void testReplicationFailureRecorded() {
testEnv.sleep(Duration.ofMinutes(2L));
workflow.submitManualSync();

Mockito.verify(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.argThat(new HasFailureFromSource(FailureOrigin.REPLICATION_WORKER)));
Mockito.verify(mJobCreationAndStatusUpdateActivity).attemptFailure(Mockito.argThat(new HasFailureFromSource(FailureOrigin.REPLICATION)));

testEnv.shutdown();
}
Expand Down

0 comments on commit 01f4675

Please sign in to comment.