Skip to content

Commit

Permalink
Handling configuration exceptions in IntegrationRunner (#18989)
Browse files Browse the repository at this point in the history
* Hanlde error msgs in integration runner

* Add ConfigErrorException

* Some formatting
  • Loading branch information
akashkulk committed Nov 4, 2022
1 parent 1d809a7 commit 972a8c0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.exceptions;

/**
* An exception that indicates that there is something wrong with the user's connector setup. This
* exception is caught and emits an AirbyteTraceMessage.
*/
public class ConfigErrorException extends RuntimeException {

private final String displayMessage;

public ConfigErrorException(final String displayMessage) {
super(displayMessage);
this.displayMessage = displayMessage;
}

public ConfigErrorException(final String displayMessage, final Throwable exception) {
super(displayMessage, exception);
this.displayMessage = displayMessage;
}

public String getDisplayMessage() {
return displayMessage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -105,53 +108,92 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Command: {}", parsed.getCommand());
LOGGER.info("Integration config: {}", parsed);

switch (parsed.getCommand()) {
// common
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
} catch (final Exception e) {
// if validation fails don't throw an exception, return a failed connection check message
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(
new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage())));
}
try {
switch (parsed.getCommand()) {
// common
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
try {
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
} catch (final Exception e) {
// if validation fails don't throw an exception, return a failed connection check message
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(
new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage())));
}

outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator);
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
}
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
runConsumer(consumer);
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator);
}
}
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
runConsumer(consumer);
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
} catch (final Exception e) {
final String displayMessage = getDisplayMessage(e);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(e)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
// Currently, special handling is required for the SPEC case since the user display information in
// the trace message is
// not properly surfaced to the FE. In the future, we can remove this and just throw an exception.
outputRecordCollector
.accept(
new AirbyteMessage()
.withType(Type.CONNECTION_STATUS)
.withConnectionStatus(
new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(displayMessage)));
return;
}
throw e;
}

LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private boolean isConfigError(final Exception e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Exception e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage();
}
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator) throws Exception {
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(outputRecordCollector),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConnectionErrorException;
Expand Down

0 comments on commit 972a8c0

Please sign in to comment.