diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java index c769b70d2..dbe34bb80 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java @@ -40,7 +40,7 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean { LogMinerMetrics(CdcSourceTaskContext taskContext) { super(taskContext, "log-miner"); - maxBatchSize.set(2000); + maxBatchSize.set(5_000); millisecondToSleepBetweenMiningQuery.set(1000); fetchedRecordSizeLimitToFallAsleep.set(50); @@ -161,7 +161,7 @@ public int getFetchedRecordSizeLimitToFallAsleep() { // MBean accessible setters @Override public void setMaxBatchSize(int size) { - if (size >= 100 && size <= 10_000) { + if (size >= 100 && size <= 20_000) { maxBatchSize.set(size); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index bce109de4..7efc5700e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -128,7 +128,7 @@ public int processResult(ResultSet resultSet) { // DDL if (operationCode == RowMapper.DDL) { - LOGGER.debug("DDL: {}, REDO_SQL {}", logMessage, redo_sql); + LOGGER.debug("DDL: {}, REDO_SQL: {}", logMessage, redo_sql); continue; // todo parse, add to the collection. } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index d3eedeeb2..4819a0f1a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -155,7 +155,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio transactionalBuffer.abandonLongTransactions(nextOldestScn); LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn); offsetContext.setScn(nextOldestScn); - lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); + updateStartScn(); }); LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn); @@ -181,15 +181,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio metronome.pause(); } + updateStartScn(); + // get largest scn from the last uncommitted transaction and set as last processed scn LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn()); - lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); - // update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer if (transactionalBuffer.isEmpty()) { offsetContext.setScn(lastProcessedScn); - transactionalBuffer.resetLargestScn(); + transactionalBuffer.resetLargestScn(null); } res.close(); @@ -217,6 +217,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } } + private void updateStartScn() { + long startScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); + if (startScn == lastProcessedScn) { + transactionalBuffer.resetLargestScn(nextScn); + lastProcessedScn = nextScn; + } else { + lastProcessedScn = startScn; + } + } + @Override public void commitOffset(Map offset) { // nothing to do diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 0b29cb76e..7c2c22523 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -82,15 +82,18 @@ public final class TransactionalBuffer { * @return largest last SCN in the buffer among all transactions */ public BigDecimal getLargestScn() { - calculateLargestScn(); return largestScn; } /** - * Reset Largest SCNs + * Reset Largest SCN */ - public void resetLargestScn() { - largestScn = BigDecimal.ZERO; + public void resetLargestScn(Long value) { + if (value != null) { + largestScn = new BigDecimal(value); + } else { + largestScn = BigDecimal.ZERO; + } } /** @@ -274,10 +277,11 @@ boolean rollback(String transactionId, String debugMessage) { metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions); metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId)); + + transactions.remove(transactionId); return true; } - transactions.remove(transactionId); return false; } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index ad385edc0..3bccfdc18 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -97,6 +97,8 @@ public void testIsEmptyWhenTransactionIsRolledBack() { assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); } + // todo more tests + @Test public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1);