Skip to content

Commit

Permalink
Fail sync when Debezium fails to shutdown properly (#31674)
Browse files Browse the repository at this point in the history
Co-authored-by: akashkulk <akashkulk@users.noreply.github.com>
  • Loading branch information
akashkulk and akashkulk committed Oct 24, 2023
1 parent 0f2fbe6 commit a390830
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.12 | 2023-10-24 | [\#31674](https://github.com/airbytehq/airbyte/pull/31674) | Fail sync when Debezium does not shut down properly. |
| 0.1.11 | 2023-10-18 | [\#31486](https://github.com/airbytehq/airbyte/pull/31486) | Update constants in AdaptiveSourceRunner. |
| 0.1.9 | 2023-10-12 | [\#31309](https://github.com/airbytehq/airbyte/pull/31309) | Use toPlainString() when handling BigDecimals in PostgresConverter |
| 0.1.8 | 2023-10-11 | [\#31322](https://github.com/airbytehq/airbyte/pull/31322) | Cap log line length to 32KB to prevent loss of records |
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ subprojects { subproject ->
repositories {
maven {
name 'airbyte-public-jars'
url 'https://airbyte.mycloudrepo.io/public/repositories/airbyte-public-jars/'
url 'https://airbyte.mycloudrepo.io/repositories/airbyte-public-jars/'
credentials {
username System.getenv('CLOUDREPO_USER')
password System.getenv('CLOUDREPO_PASSWORD')
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.1.11
version=0.1.12
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,22 @@ public void start(final BlockingQueue<ChangeEvent<String, String>> queue) {
if (e.value() != null) {
try {
queue.put(e);
} catch (InterruptedException ex) {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
})
.using((success, message, error) -> {
LOGGER.info("Debezium engine shutdown.");
LOGGER.info("Debezium engine shutdown. Engine terminated successfully : {}", success);
LOGGER.info(message);
thrownError.set(error);
// If debezium has not shutdown correctly, it can indicate an error with the connector configuration
// or a partial sync success.
// In situations like these, the preference is to fail loud and clear.
if (thrownError.get() != null && !success) {
thrownError.set(new RuntimeException(message));
}
engineLatch.countDown();
})
.build();
Expand Down

0 comments on commit a390830

Please sign in to comment.