Skip to content

Commit

Permalink
馃悰 platform: Fix silent failures in sources (#4617)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada committed Jul 8, 2021
1 parent 9b20c1a commit aca70d0
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public IntegrationRunner(Source source) {
}

@VisibleForTesting
IntegrationRunner(IntegrationCliParser cliParser, Consumer<AirbyteMessage> outputRecordCollector, Destination destination, Source source) {
IntegrationRunner(IntegrationCliParser cliParser,
Consumer<AirbyteMessage> outputRecordCollector,
Destination destination,
Source source) {
Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source");
this.cliParser = cliParser;
this.outputRecordCollector = outputRecordCollector;
Expand Down Expand Up @@ -97,6 +100,7 @@ public void run(String[] args) throws Exception {
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {

final JsonNode config = parseConfig(parsed.getConfigPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
}

final ReplicationStatus outputStatus;
// First check if the process was cancelled. Cancellation takes precedence over failures.
if (cancelled.get()) {
outputStatus = ReplicationStatus.CANCELLED;
} else if (hasFailed.get()) {
}
// if the process was not cancelled but still failed, then it's an actual failure
else if (hasFailed.get()) {
outputStatus = ReplicationStatus.FAILED;
} else {
outputStatus = ReplicationStatus.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void notifyEndOfStream() throws IOException {
}

@Override
public void close() throws IOException {
public void close() throws Exception {
if (destinationProcess == null) {
return;
}
Expand All @@ -122,9 +122,9 @@ public void close() throws IOException {
LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
LOGGER.warn(
"Destination process might not have shut down correctly. destination process alive: {}, destination process exit value: {}. This warning is normal if the job was cancelled.",
destinationProcess.isAlive(), destinationProcess.exitValue());
String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.IntegrationLauncher;
import java.nio.file.Path;
Expand Down Expand Up @@ -129,9 +130,8 @@ public void close() throws Exception {
FORCED_SHUTDOWN_DURATION);

if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
LOGGER.warn(
"Source process might not have shut down correctly. source process alive: {}, source process exit value: {}. This warning is normal if the job was cancelled.",
sourceProcess.isAlive(), sourceProcess.exitValue());
String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,43 @@ void testCancellation() throws InterruptedException {

@Test
void testPopulatesOutputOnSuccess() throws WorkerException {
testPopulatesOutput();
final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L));
when(sourceMessageTracker.getRecordCount()).thenReturn(12L);
when(sourceMessageTracker.getBytesCount()).thenReturn(100L);
when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
sourceMessageTracker,
destinationMessageTracker);

final ReplicationOutput actual = worker.run(syncInput, jobRoot);
final ReplicationOutput replicationOutput = new ReplicationOutput()
.withReplicationAttemptSummary(new ReplicationAttemptSummary()
.withRecordsSynced(12L)
.withBytesSynced(100L)
.withStatus(ReplicationStatus.COMPLETED))
.withOutputCatalog(syncInput.getCatalog())
.withState(new State().withState(expectedState));

// good enough to verify that times are present.
assertNotNull(actual.getReplicationAttemptSummary().getStartTime());
assertNotNull(actual.getReplicationAttemptSummary().getEndTime());

// verify output object matches declared json schema spec.
final Set<String> validate = new JsonSchemaValidator()
.validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual));
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));

// remove times so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null);
actual.getReplicationAttemptSummary().withEndTime(null);

assertEquals(replicationOutput, actual);
}

@Test
Expand Down Expand Up @@ -295,44 +331,4 @@ void testDoesNotPopulateOnIrrecoverableFailure() {
assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot));
}

private void testPopulatesOutput() throws WorkerException {
final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L));
when(sourceMessageTracker.getRecordCount()).thenReturn(12L);
when(sourceMessageTracker.getBytesCount()).thenReturn(100L);
when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
sourceMessageTracker,
destinationMessageTracker);

final ReplicationOutput actual = worker.run(syncInput, jobRoot);
final ReplicationOutput replicationOutput = new ReplicationOutput()
.withReplicationAttemptSummary(new ReplicationAttemptSummary()
.withRecordsSynced(12L)
.withBytesSynced(100L)
.withStatus(ReplicationStatus.COMPLETED))
.withOutputCatalog(syncInput.getCatalog())
.withState(new State().withState(expectedState));

// good enough to verify that times are present.
assertNotNull(actual.getReplicationAttemptSummary().getStartTime());
assertNotNull(actual.getReplicationAttemptSummary().getEndTime());

// verify output object matches declared json schema spec.
final Set<String> validate = new JsonSchemaValidator()
.validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual));
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));

// remove times so we can do the rest of the object <> object comparison.
actual.getReplicationAttemptSummary().withStartTime(null);
actual.getReplicationAttemptSummary().withEndTime(null);

assertEquals(replicationOutput, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,14 @@ public void testCloseNotifiesLifecycle() throws Exception {
verify(outputStream).close();
}

@Test
public void testNonzeroExitCodeThrowsException() throws Exception {
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

when(process.isAlive()).thenReturn(false);
when(process.exitValue()).thenReturn(1);
Assertions.assertThrows(WorkerException.class, destination::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,14 @@ public void testSuccessfulLifecycle() throws Exception {
verify(process).exitValue();
}

@Test
public void testNonzeroExitCodeThrows() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
tap.start(SOURCE_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);

Assertions.assertThrows(WorkerException.class, tap::close);
}

}

0 comments on commit aca70d0

Please sign in to comment.