From 2075352c06cfd120df9f74da61af340683ec36f5 Mon Sep 17 00:00:00 2001 From: Ignatenko Andrey Date: Mon, 16 Mar 2020 15:15:51 -0700 Subject: [PATCH] Merge pull request #38 in N4FRA/debezium from DSOPS-101_session_loops_after_rollback_1 to master Squashed commit of the following: commit 123bcc651acef2a31ab81dc07a337bf996857833 Author: AndreyIg Date: Mon Mar 16 13:14:45 2020 -0700 DSOPS-101, mining session loops after rollback commit 4343d84e1477fb0ad15730fd48ff587ae49aea2a Merge: 996c0c0f 0536eab2 Author: AndreyIg Date: Mon Mar 16 13:13:49 2020 -0700 Merge branch 'master' into DSOPS-101_session_loops_after_rollback_1 commit 0536eab2fc68d097f4d355618acba05ea6dd0c07 Merge: 76f9f808 e30cfbd2 Author: AndreyIg Date: Thu Mar 12 13:31:51 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 996c0c0ff4c7989eb2469a8c2486de6f80e44484 Author: AndreyIg Date: Thu Mar 12 13:11:16 2020 -0700 DSOPS-101_session_loops_after_rollback commit f49ae0bb1e205f7dce13c1ea342262cdd0b57ee8 Merge: 59b65a1b 76f9f808 Author: AndreyIg Date: Thu Mar 12 13:04:47 2020 -0700 Merge branch 'master' into DSOPS-101_session_loops_after_rollback commit 76f9f8087d8249f224241df8b24c0285be238daf Merge: 77e567e5 af6f8a3b Author: AndreyIg Date: Wed Mar 11 12:33:09 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 59b65a1ba27436e75774593d35a76a1379b5e830 Merge: ddd3c186 77e567e5 Author: AndreyIg Date: Mon Mar 9 12:08:39 2020 -0700 Merge branch 'master' into DSOPS-101_session_loops_after_rollback commit 77e567e595f0c5307127610c8b29c8697cc80878 Merge: 8e3d9224 0585b2b7 Author: AndreyIg Date: Mon Mar 9 12:07:58 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit ddd3c1867ca7fd5aea6f0d54c8b431e8bc6648f1 Author: AndreyIg Date: Mon Mar 9 11:16:49 2020 -0700 DSOPS-101, mining session loops after rollback commit 8e3d92245229ca553969d069b803ce90d96a8e30 Merge: a98bb754 d4bc5286 Author: AndreyIg Date: Sat Mar 7 04:26:18 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit a98bb75401a6902b0576ec68c1247c5a7355af68 Merge: c78c3685 a23eb5a5 Author: AndreyIg Date: Sat Mar 7 04:12:31 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit c78c36858d45b91eee837cf4189379c198667e47 Merge: 90bcc19c 4619fcd0 Author: AndreyIg Date: Fri Mar 6 06:52:42 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 90bcc19c13467ab8cc017c11140a646c39049064 Merge: b5d1ea79 3e3aeea3 Author: AndreyIg Date: Mon Mar 2 14:31:07 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit b5d1ea790cd729289ac36d1b2c7a42ae8fb1a3ec Merge: 96860411 51f0dcb4 Author: AndreyIg Date: Wed Feb 26 17:17:38 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 96860411512d809463f0e884db25133583e1ef26 Merge: 926c648d 4996a49a Author: AndreyIg Date: Wed Feb 26 12:02:35 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 926c648dc23b5ecb90134e37b2d97c117fd543b2 Merge: 92140a3c 829206cb Author: AndreyIg Date: Wed Feb 26 10:49:29 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 92140a3c1aeda313befaeea340c9a4ceb6e3a182 Merge: 9fa48dfc 15a6c6c1 Author: AndreyIg Date: Tue Feb 25 18:14:58 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 9fa48dfca4b65fbb166b55f9aa5fb5624eccb161 Merge: d3da472a 27eb9af1 Author: AndreyIg Date: Fri Feb 14 16:11:29 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit d3da472a90914a57707ab8dce0656dda47751a16 Merge: 86f3f656 081731f6 Author: AndreyIg Date: Mon Feb 3 16:18:33 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 86f3f656d76b7c66ece05dc3f3388352a6293ffb Author: AndreyIg Date: Mon Feb 3 16:02:43 2020 -0800 DSCON-117, DBConnector exception while incremental loading - revert This reverts commit c3a6023605a9a9281b01924efb45eb81b6a68bf3. ... and 19 more commits --- .../oracle/logminer/LogMinerMetrics.java | 4 ++-- .../logminer/LogMinerQueryResultProcessor.java | 2 +- .../LogMinerStreamingChangeEventSource.java | 18 ++++++++++++++---- .../oracle/logminer/TransactionalBuffer.java | 14 +++++++++----- .../logminer/TransactionalBufferTest.java | 2 ++ 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java index c769b70d2..dbe34bb80 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java @@ -40,7 +40,7 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean { LogMinerMetrics(CdcSourceTaskContext taskContext) { super(taskContext, "log-miner"); - maxBatchSize.set(2000); + maxBatchSize.set(5_000); millisecondToSleepBetweenMiningQuery.set(1000); fetchedRecordSizeLimitToFallAsleep.set(50); @@ -161,7 +161,7 @@ public int getFetchedRecordSizeLimitToFallAsleep() { // MBean accessible setters @Override public void setMaxBatchSize(int size) { - if (size >= 100 && size <= 10_000) { + if (size >= 100 && size <= 20_000) { maxBatchSize.set(size); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index bce109de4..7efc5700e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -128,7 +128,7 @@ public int processResult(ResultSet resultSet) { // DDL if (operationCode == RowMapper.DDL) { - LOGGER.debug("DDL: {}, REDO_SQL {}", logMessage, redo_sql); + LOGGER.debug("DDL: {}, REDO_SQL: {}", logMessage, redo_sql); continue; // todo parse, add to the collection. } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index d3eedeeb2..4819a0f1a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -155,7 +155,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio transactionalBuffer.abandonLongTransactions(nextOldestScn); LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn); offsetContext.setScn(nextOldestScn); - lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); + updateStartScn(); }); LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn); @@ -181,15 +181,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio metronome.pause(); } + updateStartScn(); + // get largest scn from the last uncommitted transaction and set as last processed scn LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn()); - lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); - // update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer if (transactionalBuffer.isEmpty()) { offsetContext.setScn(lastProcessedScn); - transactionalBuffer.resetLargestScn(); + transactionalBuffer.resetLargestScn(null); } res.close(); @@ -217,6 +217,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } } + private void updateStartScn() { + long startScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue(); + if (startScn == lastProcessedScn) { + transactionalBuffer.resetLargestScn(nextScn); + lastProcessedScn = nextScn; + } else { + lastProcessedScn = startScn; + } + } + @Override public void commitOffset(Map offset) { // nothing to do diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 0b29cb76e..7c2c22523 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -82,15 +82,18 @@ public final class TransactionalBuffer { * @return largest last SCN in the buffer among all transactions */ public BigDecimal getLargestScn() { - calculateLargestScn(); return largestScn; } /** - * Reset Largest SCNs + * Reset Largest SCN */ - public void resetLargestScn() { - largestScn = BigDecimal.ZERO; + public void resetLargestScn(Long value) { + if (value != null) { + largestScn = new BigDecimal(value); + } else { + largestScn = BigDecimal.ZERO; + } } /** @@ -274,10 +277,11 @@ boolean rollback(String transactionId, String debugMessage) { metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions); metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId)); + + transactions.remove(transactionId); return true; } - transactions.remove(transactionId); return false; } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index ad385edc0..3bccfdc18 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -97,6 +97,8 @@ public void testIsEmptyWhenTransactionIsRolledBack() { assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); } + // todo more tests + @Test public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1);