From 972a8c041900c5daf460b07dad1dc77b9c4c384e Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Fri, 4 Nov 2022 14:38:26 -0700 Subject: [PATCH] Handling configuration exceptions in IntegrationRunner (#18989) * Hanlde error msgs in integration runner * Add ConfigErrorException * Some formatting --- .../exceptions/ConfigErrorException.java | 29 +++++ .../integrations/base/IntegrationRunner.java | 118 ++++++++++++------ .../source/relationaldb/AbstractDbSource.java | 1 - 3 files changed, 109 insertions(+), 39 deletions(-) create mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java new file mode 100644 index 00000000000000..2f095f2f5c718e --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/exceptions/ConfigErrorException.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.exceptions; + +/** + * An exception that indicates that there is something wrong with the user's connector setup. This + * exception is caught and emits an AirbyteTraceMessage. + */ +public class ConfigErrorException extends RuntimeException { + + private final String displayMessage; + + public ConfigErrorException(final String displayMessage) { + super(displayMessage); + this.displayMessage = displayMessage; + } + + public ConfigErrorException(final String displayMessage, final Throwable exception) { + super(displayMessage, exception); + this.displayMessage = displayMessage; + } + + public String getDisplayMessage() { + return displayMessage; + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 19f577e1bc6bc3..f23e913d784c9f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -9,11 +9,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions.Procedure; import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.errors.messages.ErrorMessage; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -105,53 +108,92 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Command: {}", parsed.getCommand()); LOGGER.info("Integration config: {}", parsed); - switch (parsed.getCommand()) { - // common - case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec())); - case CHECK -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - try { - validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); - } catch (final Exception e) { - // if validation fails don't throw an exception, return a failed connection check message - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( - new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage()))); - } + try { + switch (parsed.getCommand()) { + // common + case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec())); + case CHECK -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + try { + validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); + } catch (final Exception e) { + // if validation fails don't throw an exception, return a failed connection check message + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus( + new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage(e.getMessage()))); + } - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config))); - } - // source only - case DISCOVER -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER"); - outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config))); - } - // todo (cgardens) - it is incongruous that that read and write return airbyte message (the - // envelope) while the other commands return what goes inside it. - case READ -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); - try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { - produceMessages(messageIterator); + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config))); } - } - // destination only - case WRITE -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); - validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { - runConsumer(consumer); + // source only + case DISCOVER -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER"); + outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config))); + } + // todo (cgardens) - it is incongruous that that read and write return airbyte message (the + // envelope) while the other commands return what goes inside it. + case READ -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); + final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); + try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { + produceMessages(messageIterator); + } + } + // destination only + case WRITE -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); + try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { + runConsumer(consumer); + } } + default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } - default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); + } catch (final Exception e) { + final String displayMessage = getDisplayMessage(e); + // If the source connector throws a config error, a trace message with the relevant message should + // be surfaced. + if (isConfigError(e)) { + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage); + } + if (parsed.getCommand().equals(Command.CHECK)) { + // Currently, special handling is required for the SPEC case since the user display information in + // the trace message is + // not properly surfaced to the FE. In the future, we can remove this and just throw an exception. + outputRecordCollector + .accept( + new AirbyteMessage() + .withType(Type.CONNECTION_STATUS) + .withConnectionStatus( + new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(displayMessage))); + return; + } + throw e; } LOGGER.info("Completed integration: {}", integration.getClass().getName()); } + private boolean isConfigError(final Exception e) { + return e instanceof ConfigErrorException || e instanceof ConnectionErrorException; + } + + private String getDisplayMessage(final Exception e) { + if (e instanceof ConfigErrorException) { + return ((ConfigErrorException) e).getDisplayMessage(); + } else if (e instanceof ConnectionErrorException) { + final ConnectionErrorException connEx = (ConnectionErrorException) e; + return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e); + } else { + return "Could not connect with provided configuration. Error: " + e.getMessage(); + } + } + private void produceMessages(final AutoCloseableIterator messageIterator) throws Exception { watchForOrphanThreads( () -> messageIterator.forEachRemaining(outputRecordCollector), diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 7ea0ddcb8464e7..a02d4b8890be41 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -7,7 +7,6 @@ import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.airbyte.commons.exceptions.ConnectionErrorException;