Skip to content

Commit

Permalink
add FailureReason and AttemptFailureSummary schema (#9527)
Browse files Browse the repository at this point in the history
* add FailureHelper

* add jobPersistence method for writing failure summary

* record source/destination failures and include them in ReplicationOutput and StandardSyncOutput

* handle failures in ConnectionManagerWorkflow, persist them when failing/cancelling an attempt

* rename attempt to attempt_id in FailureHelper

* test that ConnectionManagerWorkflow correctly records failures

* only set failures on ReplicationOutput if a failure actually occurred

* test that source or destination failure results in correct failureReason

* remove cancellation from failure summaries

* formatting, cleanup

* remove failureSummaryForCancellation

* rename failureSource -> failureOrigin, delete retryable, clarify failureType enum values

* actually persist attemptFailureSummary now that column exists

* use attemptNumber instead of attemptId where appropriate

* small fixes

* formatting

* use maybeAttemptId instead of connectionUpdaterInput.getAttemptNumber

* missed rename from failureSource to failureOrigin
  • Loading branch information
pmossman committed Jan 25, 2022
1 parent 894a072 commit 805c8d9
Show file tree
Hide file tree
Showing 25 changed files with 676 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: AttemptFailureSummary
description: Attempt-level summarization of failures that occurred during a sync workflow.
type: object
additionalProperties: false
required:
- failures
properties:
failures:
description: Ordered list of failures that occurred during the attempt.
type: array
items:
"$ref": FailureReason.yaml
partialSuccess:
description: True if the number of committed records for this attempt was greater than 0. False if 0 records were committed.
type: boolean
44 changes: 44 additions & 0 deletions airbyte-config/models/src/main/resources/types/FailureReason.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/FailureReason.yaml
title: FailureSummary
type: object
required:
- failureOrigin
- timestamp
additionalProperties: false
properties:
failureOrigin:
description: Indicates where the error originated. If not set, the origin of error is not well known.
type: string
enum:
- unknown
- source
- destination
- replicationWorker
- persistence
- normalization
- dbt
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
enum:
- unknown
- userError
- systemError
- transient
internalMessage:
description: Human readable failure description for consumption by technical system operators, like Airbyte engineers or OSS users.
type: string
externalMessage:
description: Human readable failure description for presentation in the UI to non-technical users.
type: string
metadata:
description: Key-value pairs of relevant data
type: object
additionalProperties: true
stacktrace:
description: Raw stacktrace associated with the failure.
type: string
timestamp:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ properties:
"$ref": State.yaml
output_catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
failures:
type: array
items:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ properties:
"$ref": State.yaml
output_catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
failures:
type: array
items:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.scheduler.models;

import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobOutput;
import java.nio.file.Path;
import java.util.Objects;
Expand All @@ -16,6 +17,7 @@ public class Attempt {
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
private final AttemptFailureSummary failureSummary;
private final Path logPath;
private final long updatedAtInSecond;
private final long createdAtInSecond;
Expand All @@ -26,13 +28,15 @@ public Attempt(final long id,
final Path logPath,
final @Nullable JobOutput output,
final AttemptStatus status,
final @Nullable AttemptFailureSummary failureSummary,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.jobId = jobId;
this.output = output;
this.status = status;
this.failureSummary = failureSummary;
this.logPath = logPath;
this.updatedAtInSecond = updatedAtInSecond;
this.createdAtInSecond = createdAtInSecond;
Expand All @@ -55,6 +59,10 @@ public AttemptStatus getStatus() {
return status;
}

public Optional<AttemptFailureSummary> getFailureSummary() {
return Optional.ofNullable(failureSummary);
}

public Path getLogPath() {
return logPath;
}
Expand Down Expand Up @@ -90,13 +98,14 @@ public boolean equals(final Object o) {
createdAtInSecond == attempt.createdAtInSecond &&
Objects.equals(output, attempt.output) &&
status == attempt.status &&
Objects.equals(failureSummary, attempt.failureSummary) &&
Objects.equals(logPath, attempt.logPath) &&
Objects.equals(endedAtInSecond, attempt.endedAtInSecond);
}

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
Expand All @@ -106,6 +115,7 @@ public String toString() {
", jobId=" + jobId +
", output=" + output +
", status=" + status +
", failureSummary=" + failureSummary +
", logPath=" + logPath +
", updatedAtInSecond=" + updatedAtInSecond +
", createdAtInSecond=" + createdAtInSecond +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() {
}

private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) {
return new Attempt(1L, 1L, null, null, attemptStatus, 0L, 0L, null);
return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testHasRunningAttempt() {

private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) {
final List<Attempt> attempts = Arrays.stream(attemptStatuses)
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, 0L, 0L, null))
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null))
.collect(Collectors.toList());
return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class DefaultJobPersistence implements JobPersistence {
+ "attempts.log_path AS log_path,\n"
+ "attempts.output AS attempt_output,\n"
+ "attempts.status AS attempt_status,\n"
+ "attempts.failure_summary AS attempt_failure_summary,\n"
+ "attempts.created_at AS attempt_created_at,\n"
+ "attempts.updated_at AS attempt_updated_at,\n"
+ "attempts.ended_at AS attempt_ended_at\n"
Expand Down Expand Up @@ -322,6 +324,18 @@ public <T> void writeOutput(final long jobId, final int attemptNumber, final T o
.execute());
}

@Override
public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);

jobDatabase.transaction(
ctx -> ctx.update(ATTEMPTS)
.set(ATTEMPTS.FAILURE_SUMMARY, JSONB.valueOf(Jsons.serialize(failureSummary)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute());
}

@Override
public Job getJob(final long jobId) throws IOException {
return jobDatabase.query(ctx -> getJob(ctx, jobId));
Expand Down Expand Up @@ -441,6 +455,8 @@ private static Attempt getAttemptFromRecord(final Record record) {
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Enums.toEnum(record.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(),
record.get("attempt_failure_summary", String.class) == null ? null
: Jsons.deserialize(record.get("attempt_failure_summary", String.class), AttemptFailureSummary.class),
getEpoch(record, "attempt_created_at"),
getEpoch(record, "attempt_updated_at"),
Optional.ofNullable(record.get("attempt_ended_at"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
Expand Down Expand Up @@ -125,6 +126,16 @@ public interface JobPersistence {
*/
<T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException;

/**
* Writes a summary of all failures that occurred during the attempt.
*
* @param jobId job id
* @param attemptNumber attempt number
* @param failureSummary summary containing failure metadata and ordered list of failures
* @throws IOException exception due to interaction with persistence
*/
void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException;

/**
* @param configTypes - type of config, e.g. sync
* @param configId - id of that config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Sqls;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobGetSpecConfig;
Expand Down Expand Up @@ -114,6 +117,7 @@ private static Attempt createAttempt(final long id, final long jobId, final Atte
logPath,
null,
status,
null,
NOW.getEpochSecond(),
NOW.getEpochSecond(),
NOW.getEpochSecond());
Expand All @@ -126,6 +130,7 @@ private static Attempt createUnfinishedAttempt(final long id, final long jobId,
logPath,
null,
status,
null,
NOW.getEpochSecond(),
NOW.getEpochSecond(),
null);
Expand Down Expand Up @@ -235,6 +240,23 @@ void testWriteOutput() throws IOException {
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());
}

@Test
@DisplayName("Should be able to read attemptFailureSummary that was written")
void testWriteAttemptFailureSummary() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job created = jobPersistence.getJob(jobId);
final AttemptFailureSummary failureSummary = new AttemptFailureSummary().withFailures(
Collections.singletonList(new FailureReason().withFailureOrigin(FailureOrigin.SOURCE)));

when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, failureSummary);

final Job updated = jobPersistence.getJob(jobId);
assertEquals(Optional.of(failureSummary), updated.getAttempts().get(0).getFailureSummary());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());
}

@Test
@DisplayName("When getting the last replication job should return the most recently created job")
void testGetLastSyncJobWithMultipleAttempts() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
}

private static Attempt createSuccessfulAttempt(final long jobId, final long timestamps) {
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, timestamps, timestamps, timestamps);
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, null, timestamps, timestamps, timestamps);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers;

import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -14,11 +15,13 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.protocols.airbyte.AirbyteMapper;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.MessageTracker;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,6 +108,9 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

final long startTime = System.currentTimeMillis();
final AtomicReference<FailureReason> sourceFailureRef = new AtomicReference<>();
final AtomicReference<FailureReason> destinationFailureRef = new AtomicReference<>();

try {
LOGGER.info("configured sync modes: {}", syncInput.getCatalog().getStreams()
.stream()
Expand All @@ -119,13 +126,23 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
executors);
executors).whenComplete((msg, ex) -> {
if (ex != null) {
destinationFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
}
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc),
executors);
executors).whenComplete((msg, ex) -> {
if (ex != null) {
sourceFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt));
}
});

LOGGER.info("Waiting for source and destination threads to complete.");
// CompletableFuture#allOf waits until all futures finish before returning, even if one throws an
Expand Down Expand Up @@ -198,11 +215,24 @@ else if (hasFailed.get()) {
.withEndTime(System.currentTimeMillis());

LOGGER.info("sync summary: {}", summary);

final ReplicationOutput output = new ReplicationOutput()
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());

// only .setFailures() if a failure occurred
final FailureReason sourceFailure = sourceFailureRef.get();
final FailureReason destinationFailure = destinationFailureRef.get();
final List<FailureReason> failures = new ArrayList<>();
if (sourceFailure != null) {
failures.add(sourceFailure);
}
if (destinationFailure != null) {
failures.add(destinationFailure);
}
if (!failures.isEmpty()) {
output.setFailures(failures);
}

if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
Expand Down
Loading

0 comments on commit 805c8d9

Please sign in to comment.