diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 7efc5700e..9012593cf 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -152,7 +152,7 @@ public int processResult(ResultSet resultSet) { LogMinerRowLcr rowLcr = dmlParser.getDmlChange(); LOGGER.trace("parsed record: {}" , rowLcr); if (rowLcr == null || redo_sql == null) { - LOGGER.error("Following statement was not parsed: {}, details: {}", redo_sql, logMessage); + LOGGER.warn("Following statement was not parsed: {}, details: {}", redo_sql, logMessage); continue; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 7281f017d..74ec1514a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -51,8 +51,13 @@ public final class TransactionalBuffer { private final ErrorHandler errorHandler; private Optional metrics; private final Set abandonedTransactionIds; + // storing rolledBackTransactionIds is for debugging purposes to check what was rolled back to research, todo delete in future releases private final Set rolledBackTransactionIds; + + // It holds the latest captured uncommitted SCN. + // This number tracks starting point for the next mining cycle. + // This gets increased by 1 on each COMMIT or ROLLBACK to avoid reading COMMITTED or ROLLEDBACK transactions on the next mining loop. private BigDecimal largestScn; /** @@ -78,13 +83,19 @@ public final class TransactionalBuffer { } /** - * * @return largest last SCN in the buffer among all transactions */ public BigDecimal getLargestScn() { return largestScn; } + /** + * @return rolled back transactions + */ + public Set getRolledBackTransactionIds() { + return new HashSet<>(rolledBackTransactionIds); + } + /** * Reset Largest SCN */ @@ -229,6 +240,7 @@ void abandonLongTransactions(Long thresholdScn) { iter.remove(); calculateLargestScn(); + calculateSmallestScn(); metrics.ifPresent(t -> t.addAbandonedTransactionId(transaction.getKey())); metrics.ifPresent(t -> t.decrementCapturedDmlCounter(transaction.getValue().commitCallbacks.size())); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerUtilsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerUtilsTest.java index 959b9dad1..87a99aaca 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerUtilsTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerUtilsTest.java @@ -59,7 +59,6 @@ public void testStartLogMinerStatement() { @Test public void testBlacklistFiltering() throws Exception { - OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(null); ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME); tables = new Tables(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 3bccfdc18..1044d20f9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -95,25 +95,51 @@ public void testIsEmptyWhenTransactionIsRolledBack() { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); transactionalBuffer.rollback(TRANSACTION_ID, ""); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); } - // todo more tests + @Test + public void testNonEmptyFirstTransactionIsRolledBack() { + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.rollback(TRANSACTION_ID, ""); + assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); + assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue(); + assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse(); + } @Test - public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException { + public void testNonEmptySecondTransactionIsRolledBack() { + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.rollback(OTHER_TRANSACTION_ID, ""); + assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); + assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse(); + assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue(); + } + + @Test + public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); - assertThat(smallestScnContainer.get()).isEqualTo(SCN); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); // after commit + + assertThat(smallestScnContainer.get()).isNull(); + + assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue(); } @Test - public void testCalculateSmallestScnWhenFirstTransactionIsCommitted() throws InterruptedException { + public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { @@ -121,13 +147,18 @@ public void testCalculateSmallestScnWhenFirstTransactionIsCommitted() throws Int commitLatch.countDown(); }); transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); - assertThat(smallestScnContainer.get()).isEqualTo(SCN); + // after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); + + assertThat(smallestScnContainer.get()).isEqualTo(OTHER_SCN); + assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue(); } @Test - public void testCalculateSmallestScnWhenSecondTransactionIsCommitted() throws InterruptedException { + public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); @@ -135,21 +166,44 @@ public void testCalculateSmallestScnWhenSecondTransactionIsCommitted() throws In smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); assertThat(smallestScnContainer.get()).isEqualTo(SCN); + // after committing OTHER_TRANSACTION_ID + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); + assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue(); } @Test - public void testAbandoningTransaction() { + public void testResetLargestScn() { + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit + transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); // after commit + + transactionalBuffer.resetLargestScn(null); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO); + transactionalBuffer.resetLargestScn(OTHER_SCN.longValue()); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); + } + + @Test + public void testAbandoningOneTransaction() { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); transactionalBuffer.abandonLongTransactions(SCN.longValue()); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO); + } + @Test + public void testAbandoningTransactionHavingAnotherOne() { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); transactionalBuffer.abandonLongTransactions(SCN.longValue()); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); + assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); } @Test