diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 0ea6ec7f2ea7f1..cda5830460e153 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -9,14 +9,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.airbyte.commons.exceptions.ConfigErrorException; -import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions.Procedure; import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.integrations.base.errors.messages.ErrorMessage; +import io.airbyte.integrations.util.ConnectorExceptionUtil; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -157,11 +155,11 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { // to // find the root exception that corresponds to a configuration error. If that does not exist, we // just return the original exception. - final Throwable rootThrowable = getRootConfigError(e); - final String displayMessage = getDisplayMessage(rootThrowable); + final Throwable rootThrowable = ConnectorExceptionUtil.getRootConfigError(e); + final String displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable); // If the source connector throws a config error, a trace message with the relevant message should // be surfaced. - if (isConfigError(rootThrowable)) { + if (ConnectorExceptionUtil.isConfigError(rootThrowable)) { AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage); } if (parsed.getCommand().equals(Command.CHECK)) { @@ -184,37 +182,6 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - /** - * Returns the first instance of an exception associated with a configuration error (if it exists). - * Otherwise, the original exception is returned. - */ - private Throwable getRootConfigError(final Exception e) { - Throwable current = e; - while (current != null) { - if (isConfigError(current)) { - return current; - } else { - current = current.getCause(); - } - } - return e; - } - - private boolean isConfigError(final Throwable e) { - return e instanceof ConfigErrorException || e instanceof ConnectionErrorException; - } - - private String getDisplayMessage(final Throwable e) { - if (e instanceof ConfigErrorException) { - return ((ConfigErrorException) e).getDisplayMessage(); - } else if (e instanceof ConnectionErrorException) { - final ConnectionErrorException connEx = (ConnectionErrorException) e; - return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx); - } else { - return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : ""; - } - } - private void produceMessages(final AutoCloseableIterator messageIterator) throws Exception { watchForOrphanThreads( () -> messageIterator.forEachRemaining(outputRecordCollector), diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java new file mode 100644 index 00000000000000..a87e85b3c92e96 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/ConnectorExceptionUtil.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.util; + +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.ConnectionErrorException; +import io.airbyte.integrations.base.errors.messages.ErrorMessage; +import java.sql.SQLException; +import java.util.List; +import java.util.Locale; +import java.util.function.Predicate; + +/** + * Utility class defining methods for handling configuration exceptions in connectors. + */ +public class ConnectorExceptionUtil { + + public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s"; + static final String RECOVERY_CONNECTION_ERROR_MESSAGE = + "We're having issues syncing from a Postgres replica that is configured as a hot standby server. " + + "Please see https://docs.airbyte.com/integrations/sources/postgres/#sync-data-from-postgres-hot-standby-server for options and workarounds"; + private static final List> configErrorPredicates = + List.of(getConfigErrorPredicate(), getConnectionErrorPredicate(), isRecoveryConnectionExceptionPredicate()); + + public static boolean isConfigError(final Throwable e) { + return configErrorPredicates.stream().anyMatch(predicate -> predicate.test(e)); + } + + public static String getDisplayMessage(final Throwable e) { + if (e instanceof ConfigErrorException) { + return ((ConfigErrorException) e).getDisplayMessage(); + } else if (e instanceof ConnectionErrorException) { + final ConnectionErrorException connEx = (ConnectionErrorException) e; + return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx); + } else if (isRecoveryConnectionExceptionPredicate().test(e)) { + return RECOVERY_CONNECTION_ERROR_MESSAGE; + } else { + return String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.getMessage() != null ? e.getMessage() : ""); + } + } + + /** + * Returns the first instance of an exception associated with a configuration error (if it exists). + * Otherwise, the original exception is returned. + */ + public static Throwable getRootConfigError(final Exception e) { + Throwable current = e; + while (current != null) { + if (ConnectorExceptionUtil.isConfigError(current)) { + return current; + } else { + current = current.getCause(); + } + } + return e; + } + + private static Predicate getConfigErrorPredicate() { + return e -> e instanceof ConfigErrorException; + } + + private static Predicate getConnectionErrorPredicate() { + return e -> e instanceof ConnectionErrorException; + } + + private static Predicate isRecoveryConnectionExceptionPredicate() { + return e -> e instanceof SQLException && e.getMessage() + .toLowerCase(Locale.ROOT) + .contains("due to conflict with recovery"); + } + +} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 5b9bd4cb183f7f..eb00200a72f697 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.base; +import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -269,7 +270,8 @@ void testCheckNestedException() throws Exception { @Test void testCheckRuntimeException() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.check(configPath); - final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error"); + final AirbyteConnectionStatus output = + new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, "Runtime Error")); final RuntimeException runtimeException = new RuntimeException("Runtime Error"); when(cliParser.parse(ARGS)).thenReturn(intConfig); diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/ConnectorExceptionUtilTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/ConnectorExceptionUtilTest.java new file mode 100644 index 00000000000000..2b040478b58567 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/ConnectorExceptionUtilTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.util; + +import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE; +import static io.airbyte.integrations.util.ConnectorExceptionUtil.RECOVERY_CONNECTION_ERROR_MESSAGE; +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.ConnectionErrorException; +import java.sql.SQLException; +import org.junit.jupiter.api.Test; + +class ConnectorExceptionUtilTest { + + public static final String CONFIG_EXCEPTION_MESSAGE = "test message"; + public static final String RECOVERY_EXCEPTION_MESSAGE = "FATAL: terminating connection due to conflict with recovery"; + public static final String COMMON_EXCEPTION_MESSAGE = "something happens with connection"; + public static final String CONNECTION_ERROR_MESSAGE_TEMPLATE = "State code: %s; Error code: %s; Message: %s"; + + @Test() + void isConfigErrorForConfigException() { + ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE); + assertTrue(ConnectorExceptionUtil.isConfigError(configErrorException)); + + } + + @Test + void isConfigErrorForConnectionException() { + ConnectionErrorException connectionErrorException = new ConnectionErrorException(CONFIG_EXCEPTION_MESSAGE); + assertTrue(ConnectorExceptionUtil.isConfigError(connectionErrorException)); + } + + @Test + void isConfigErrorForRecoveryPSQLException() { + SQLException recoveryPSQLException = new SQLException(RECOVERY_EXCEPTION_MESSAGE); + assertTrue(ConnectorExceptionUtil.isConfigError(recoveryPSQLException)); + } + + @Test + void isConfigErrorForCommonSQLException() { + SQLException recoveryPSQLException = new SQLException(COMMON_EXCEPTION_MESSAGE); + assertFalse(ConnectorExceptionUtil.isConfigError(recoveryPSQLException)); + } + + @Test + void isConfigErrorForCommonException() { + assertFalse(ConnectorExceptionUtil.isConfigError(new Exception())); + } + + @Test + void getDisplayMessageForConfigException() { + ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE); + String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(configErrorException); + assertEquals(CONFIG_EXCEPTION_MESSAGE, actualDisplayMessage); + } + + @Test + void getDisplayMessageForConnectionError() { + String testCode = "test code"; + int errorCode = -1; + ConnectionErrorException connectionErrorException = new ConnectionErrorException(testCode, errorCode, CONFIG_EXCEPTION_MESSAGE, new Exception()); + String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(connectionErrorException); + assertEquals(String.format(CONNECTION_ERROR_MESSAGE_TEMPLATE, testCode, errorCode, CONFIG_EXCEPTION_MESSAGE), actualDisplayMessage); + } + + @Test + void getDisplayMessageForRecoveryException() { + SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE); + String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(recoveryException); + assertEquals(RECOVERY_CONNECTION_ERROR_MESSAGE, actualDisplayMessage); + } + + @Test + void getDisplayMessageForCommonException() { + Exception exception = new SQLException(COMMON_EXCEPTION_MESSAGE); + String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(exception); + assertEquals(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, COMMON_EXCEPTION_MESSAGE), actualDisplayMessage); + } + + @Test + void getRootConfigErrorFromConfigException() { + ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE); + Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException); + + Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception); + assertEquals(configErrorException, actualRootConfigError); + } + + @Test + void getRootConfigErrorFromRecoverySQLException() { + SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE); + RuntimeException runtimeException = new RuntimeException(COMMON_EXCEPTION_MESSAGE, recoveryException); + Exception exception = new Exception(runtimeException); + + Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception); + assertEquals(recoveryException, actualRootConfigError); + } + + @Test + void getRootConfigErrorFromNonConfigException() { + SQLException configErrorException = new SQLException(CONFIG_EXCEPTION_MESSAGE); + Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException); + + Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception); + assertEquals(exception, actualRootConfigError); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 64daa81e49516b..efd60b2a85f9b7 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte LABEL io.airbyte.version=1.0.30 + LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 91b072f1a9683f..3c313422baac0d 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte LABEL io.airbyte.version=1.0.30 + LABEL io.airbyte.name=airbyte/source-postgres