Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist Job Failure summaries for failed and cancelled attempts #9527

Merged
merged 1 commit into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: AttemptFailureSummary
description: Attempt-level summarization of failures that occurred during a sync workflow.
type: object
additionalProperties: false
required:
- failures
properties:
failures:
description: Ordered list of failures that occurred during the attempt.
type: array
items:
"$ref": FailureReason.yaml
partialSuccess:
description: True if the number of committed records for this attempt was greater than 0. False if 0 records were committed.
type: boolean
44 changes: 44 additions & 0 deletions airbyte-config/models/src/main/resources/types/FailureReason.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/FailureReason.yaml
title: FailureSummary
type: object
required:
- failureOrigin
- timestamp
additionalProperties: false
properties:
failureOrigin:
description: Indicates where the error originated. If not set, the origin of error is not well known.
type: string
enum:
- unknown
- source
- destination
- replicationWorker
- persistence
- normalization
- dbt
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
enum:
- unknown
- userError
- systemError
- transient
internalMessage:
description: Human readable failure description for consumption by technical system operators, like Airbyte engineers or OSS users.
type: string
externalMessage:
description: Human readable failure description for presentation in the UI to non-technical users.
type: string
metadata:
description: Key-value pairs of relevant data
type: object
additionalProperties: true
stacktrace:
description: Raw stacktrace associated with the failure.
type: string
timestamp:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ properties:
"$ref": State.yaml
output_catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
failures:
type: array
items:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ properties:
"$ref": State.yaml
output_catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
failures:
type: array
items:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.scheduler.models;

import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobOutput;
import java.nio.file.Path;
import java.util.Objects;
Expand All @@ -16,6 +17,7 @@ public class Attempt {
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
private final AttemptFailureSummary failureSummary;
private final Path logPath;
private final long updatedAtInSecond;
private final long createdAtInSecond;
Expand All @@ -26,13 +28,15 @@ public Attempt(final long id,
final Path logPath,
final @Nullable JobOutput output,
final AttemptStatus status,
final @Nullable AttemptFailureSummary failureSummary,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.jobId = jobId;
this.output = output;
this.status = status;
this.failureSummary = failureSummary;
this.logPath = logPath;
this.updatedAtInSecond = updatedAtInSecond;
this.createdAtInSecond = createdAtInSecond;
Expand All @@ -55,6 +59,10 @@ public AttemptStatus getStatus() {
return status;
}

public Optional<AttemptFailureSummary> getFailureSummary() {
return Optional.ofNullable(failureSummary);
}

public Path getLogPath() {
return logPath;
}
Expand Down Expand Up @@ -90,13 +98,14 @@ public boolean equals(final Object o) {
createdAtInSecond == attempt.createdAtInSecond &&
Objects.equals(output, attempt.output) &&
status == attempt.status &&
Objects.equals(failureSummary, attempt.failureSummary) &&
Objects.equals(logPath, attempt.logPath) &&
Objects.equals(endedAtInSecond, attempt.endedAtInSecond);
}

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
Expand All @@ -106,6 +115,7 @@ public String toString() {
", jobId=" + jobId +
", output=" + output +
", status=" + status +
", failureSummary=" + failureSummary +
", logPath=" + logPath +
", updatedAtInSecond=" + updatedAtInSecond +
", createdAtInSecond=" + createdAtInSecond +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() {
}

private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) {
return new Attempt(1L, 1L, null, null, attemptStatus, 0L, 0L, null);
return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testHasRunningAttempt() {

private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) {
final List<Attempt> attempts = Arrays.stream(attemptStatuses)
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, 0L, 0L, null))
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null))
.collect(Collectors.toList());
return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class DefaultJobPersistence implements JobPersistence {
+ "attempts.log_path AS log_path,\n"
+ "attempts.output AS attempt_output,\n"
+ "attempts.status AS attempt_status,\n"
+ "attempts.failure_summary AS attempt_failure_summary,\n"
+ "attempts.created_at AS attempt_created_at,\n"
+ "attempts.updated_at AS attempt_updated_at,\n"
+ "attempts.ended_at AS attempt_ended_at\n"
Expand Down Expand Up @@ -322,6 +324,18 @@ public <T> void writeOutput(final long jobId, final int attemptNumber, final T o
.execute());
}

@Override
public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);

jobDatabase.transaction(
ctx -> ctx.update(ATTEMPTS)
.set(ATTEMPTS.FAILURE_SUMMARY, JSONB.valueOf(Jsons.serialize(failureSummary)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute());
}

@Override
public Job getJob(final long jobId) throws IOException {
return jobDatabase.query(ctx -> getJob(ctx, jobId));
Expand Down Expand Up @@ -441,6 +455,8 @@ private static Attempt getAttemptFromRecord(final Record record) {
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Enums.toEnum(record.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(),
record.get("attempt_failure_summary", String.class) == null ? null
: Jsons.deserialize(record.get("attempt_failure_summary", String.class), AttemptFailureSummary.class),
getEpoch(record, "attempt_created_at"),
getEpoch(record, "attempt_updated_at"),
Optional.ofNullable(record.get("attempt_ended_at"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
Expand Down Expand Up @@ -125,6 +126,16 @@ public interface JobPersistence {
*/
<T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException;

/**
* Writes a summary of all failures that occurred during the attempt.
*
* @param jobId job id
* @param attemptNumber attempt number
* @param failureSummary summary containing failure metadata and ordered list of failures
* @throws IOException exception due to interaction with persistence
*/
void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException;

/**
* @param configTypes - type of config, e.g. sync
* @param configId - id of that config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Sqls;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobGetSpecConfig;
Expand Down Expand Up @@ -114,6 +117,7 @@ private static Attempt createAttempt(final long id, final long jobId, final Atte
logPath,
null,
status,
null,
NOW.getEpochSecond(),
NOW.getEpochSecond(),
NOW.getEpochSecond());
Expand All @@ -126,6 +130,7 @@ private static Attempt createUnfinishedAttempt(final long id, final long jobId,
logPath,
null,
status,
null,
NOW.getEpochSecond(),
NOW.getEpochSecond(),
null);
Expand Down Expand Up @@ -235,6 +240,23 @@ void testWriteOutput() throws IOException {
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());
}

@Test
@DisplayName("Should be able to read attemptFailureSummary that was written")
void testWriteAttemptFailureSummary() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job created = jobPersistence.getJob(jobId);
final AttemptFailureSummary failureSummary = new AttemptFailureSummary().withFailures(
Collections.singletonList(new FailureReason().withFailureOrigin(FailureOrigin.SOURCE)));

when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, failureSummary);

final Job updated = jobPersistence.getJob(jobId);
assertEquals(Optional.of(failureSummary), updated.getAttempts().get(0).getFailureSummary());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());
}

@Test
@DisplayName("When getting the last replication job should return the most recently created job")
void testGetLastSyncJobWithMultipleAttempts() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
}

private static Attempt createSuccessfulAttempt(final long jobId, final long timestamps) {
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, timestamps, timestamps, timestamps);
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, null, timestamps, timestamps, timestamps);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers;

import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -14,11 +15,13 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.protocols.airbyte.AirbyteMapper;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.MessageTracker;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,6 +108,9 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

final long startTime = System.currentTimeMillis();
final AtomicReference<FailureReason> sourceFailureRef = new AtomicReference<>();
final AtomicReference<FailureReason> destinationFailureRef = new AtomicReference<>();

try {
LOGGER.info("configured sync modes: {}", syncInput.getCatalog().getStreams()
.stream()
Expand All @@ -119,13 +126,23 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
executors);
executors).whenComplete((msg, ex) -> {
if (ex != null) {
destinationFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
}
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc),
executors);
executors).whenComplete((msg, ex) -> {
if (ex != null) {
sourceFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt));
}
});

LOGGER.info("Waiting for source and destination threads to complete.");
// CompletableFuture#allOf waits until all futures finish before returning, even if one throws an
Expand Down Expand Up @@ -198,11 +215,24 @@ else if (hasFailed.get()) {
.withEndTime(System.currentTimeMillis());

LOGGER.info("sync summary: {}", summary);

final ReplicationOutput output = new ReplicationOutput()
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());

// only .setFailures() if a failure occurred
final FailureReason sourceFailure = sourceFailureRef.get();
final FailureReason destinationFailure = destinationFailureRef.get();
final List<FailureReason> failures = new ArrayList<>();
if (sourceFailure != null) {
failures.add(sourceFailure);
}
if (destinationFailure != null) {
failures.add(destinationFailure);
}
if (!failures.isEmpty()) {
output.setFailures(failures);
}

if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
Expand Down
Loading