Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres source: handle terminate connection exception with test. #19887

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
280c633
Source postgres: catch termination connection PSQLException
sashaNeshcheret Nov 18, 2022
e06a304
Source postgres: move common code to util method
sashaNeshcheret Nov 21, 2022
a36b3a0
Source postgres: clean code
sashaNeshcheret Nov 22, 2022
da3127f
Source postgres: review fixes and added unit tests
sashaNeshcheret Nov 23, 2022
f01acb9
Source postgres: clean code
sashaNeshcheret Nov 29, 2022
cf72f52
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Nov 29, 2022
114d4aa
Source postgres: bump version
sashaNeshcheret Nov 29, 2022
3214114
Source postgres: format
sashaNeshcheret Nov 29, 2022
5fa6030
Source postgres: format
sashaNeshcheret Nov 29, 2022
dd9c22d
Source postgres: bump version
sashaNeshcheret Nov 29, 2022
36ea69d
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Nov 30, 2022
189af03
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
VitaliiMaltsev Nov 30, 2022
9061da6
Source postgres: fix failing test and bump version
sashaNeshcheret Nov 30, 2022
990521c
Merge remote-tracking branch 'origin/omneshcheret/802-postgres-source…
sashaNeshcheret Nov 30, 2022
bb554bb
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Nov 30, 2022
3d0da9c
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Nov 30, 2022
9eedbd1
Source mysql: make message visible for tests
sashaNeshcheret Nov 30, 2022
ac70345
Merge remote-tracking branch 'origin/omneshcheret/802-postgres-source…
sashaNeshcheret Nov 30, 2022
93d851b
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Nov 30, 2022
1594eba
Source postgres: reformat import
sashaNeshcheret Nov 30, 2022
929c2e5
Merge remote-tracking branch 'origin/omneshcheret/802-postgres-source…
sashaNeshcheret Nov 30, 2022
38b98a4
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
akashkulk Dec 2, 2022
81a6063
Merge remote-tracking branch 'origin/master' into omneshcheret/802-po…
sashaNeshcheret Dec 5, 2022
166b1fa
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Dec 6, 2022
d4467c2
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
rodireich Dec 6, 2022
090b135
Merge branch 'master' into omneshcheret/802-postgres-source-handle-te…
sashaNeshcheret Dec 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -157,11 +155,11 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
// to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
final Throwable rootThrowable = getRootConfigError(e);
final String displayMessage = getDisplayMessage(rootThrowable);
final Throwable rootThrowable = ConnectorExceptionUtil.getRootConfigError(e);
final String displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(rootThrowable)) {
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
Expand All @@ -184,37 +182,6 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
private Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private boolean isConfigError(final Throwable e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : "";
}
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator) throws Exception {
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(outputRecordCollector),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;

/**
* Utility class defining methods for handling configuration exceptions in connectors.
*/
public class ConnectorExceptionUtil {

public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s";
static final String RECOVERY_CONNECTION_ERROR_MESSAGE =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
"Please see https://docs.airbyte.com/integrations/sources/postgres/#sync-data-from-postgres-hot-standby-server for options and workarounds";
private static final List<Predicate<Throwable>> configErrorPredicates =
List.of(getConfigErrorPredicate(), getConnectionErrorPredicate(), isRecoveryConnectionExceptionPredicate());

public static boolean isConfigError(final Throwable e) {
return configErrorPredicates.stream().anyMatch(predicate -> predicate.test(e));
}

public static String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else if (isRecoveryConnectionExceptionPredicate().test(e)) {
return RECOVERY_CONNECTION_ERROR_MESSAGE;
} else {
return String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.getMessage() != null ? e.getMessage() : "");
}
}

/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
public static Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (ConnectorExceptionUtil.isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private static Predicate<Throwable> getConfigErrorPredicate() {
return e -> e instanceof ConfigErrorException;
}

private static Predicate<Throwable> getConnectionErrorPredicate() {
return e -> e instanceof ConnectionErrorException;
}

private static Predicate<Throwable> isRecoveryConnectionExceptionPredicate() {
return e -> e instanceof SQLException && e.getMessage()
.toLowerCase(Locale.ROOT)
.contains("due to conflict with recovery");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base;

import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -269,7 +270,8 @@ void testCheckNestedException() throws Exception {
@Test
void testCheckRuntimeException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error");
final AirbyteConnectionStatus output =
new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, "Runtime Error"));
final RuntimeException runtimeException = new RuntimeException("Runtime Error");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE;
import static io.airbyte.integrations.util.ConnectorExceptionUtil.RECOVERY_CONNECTION_ERROR_MESSAGE;
import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import java.sql.SQLException;
import org.junit.jupiter.api.Test;

class ConnectorExceptionUtilTest {

public static final String CONFIG_EXCEPTION_MESSAGE = "test message";
public static final String RECOVERY_EXCEPTION_MESSAGE = "FATAL: terminating connection due to conflict with recovery";
public static final String COMMON_EXCEPTION_MESSAGE = "something happens with connection";
public static final String CONNECTION_ERROR_MESSAGE_TEMPLATE = "State code: %s; Error code: %s; Message: %s";

@Test()
void isConfigErrorForConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(configErrorException));

}

@Test
void isConfigErrorForConnectionException() {
ConnectionErrorException connectionErrorException = new ConnectionErrorException(CONFIG_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(connectionErrorException));
}

@Test
void isConfigErrorForRecoveryPSQLException() {
SQLException recoveryPSQLException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(recoveryPSQLException));
}

@Test
void isConfigErrorForCommonSQLException() {
SQLException recoveryPSQLException = new SQLException(COMMON_EXCEPTION_MESSAGE);
assertFalse(ConnectorExceptionUtil.isConfigError(recoveryPSQLException));
}

@Test
void isConfigErrorForCommonException() {
assertFalse(ConnectorExceptionUtil.isConfigError(new Exception()));
}

@Test
void getDisplayMessageForConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(configErrorException);
assertEquals(CONFIG_EXCEPTION_MESSAGE, actualDisplayMessage);
}

@Test
void getDisplayMessageForConnectionError() {
String testCode = "test code";
int errorCode = -1;
ConnectionErrorException connectionErrorException = new ConnectionErrorException(testCode, errorCode, CONFIG_EXCEPTION_MESSAGE, new Exception());
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(connectionErrorException);
assertEquals(String.format(CONNECTION_ERROR_MESSAGE_TEMPLATE, testCode, errorCode, CONFIG_EXCEPTION_MESSAGE), actualDisplayMessage);
}

@Test
void getDisplayMessageForRecoveryException() {
SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(recoveryException);
assertEquals(RECOVERY_CONNECTION_ERROR_MESSAGE, actualDisplayMessage);
}

@Test
void getDisplayMessageForCommonException() {
Exception exception = new SQLException(COMMON_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(exception);
assertEquals(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, COMMON_EXCEPTION_MESSAGE), actualDisplayMessage);
}

@Test
void getRootConfigErrorFromConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(configErrorException, actualRootConfigError);
}

@Test
void getRootConfigErrorFromRecoverySQLException() {
SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
RuntimeException runtimeException = new RuntimeException(COMMON_EXCEPTION_MESSAGE, recoveryException);
Exception exception = new Exception(runtimeException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(recoveryException, actualRootConfigError);
}

@Test
void getRootConfigErrorFromNonConfigException() {
SQLException configErrorException = new SQLException(CONFIG_EXCEPTION_MESSAGE);
Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(exception, actualRootConfigError);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres-strict-encrypt
COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres
COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.name=airbyte/source-postgres