diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java index bc95af8c2..cbe3684c6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java @@ -89,6 +89,7 @@ static void createAuditTable(Connection connection) throws SQLException { /** * This method returns next SCN for mining and also updates MBean metrics * We use a configurable limit, because the larger mining range, the slower query from Log Miner content view. + * In addition capturing unlimited number of changes can blow up Java heap. * Gradual querying helps to catch up faster after long delays in mining. * * @param connection container level database connection diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 73e8c1a80..4d1c15ced 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -220,16 +220,18 @@ int processResult(ResultSet resultSet) { * The criteria is the offset SCN remains the same in five mining cycles */ private void warnStuckScn() { - if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) { - stuckScnCounter++; - // logWarn only once - if (stuckScnCounter == 5) { - LogMinerHelper.logWarn(transactionalBufferMetrics, - "Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn()); - transactionalBufferMetrics.incrementScnFreezeCounter(); + if (offsetContext != null && offsetContext.getCommitScn() != null) { + if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) { + stuckScnCounter++; + // logWarn only once + if (stuckScnCounter == 5) { + LogMinerHelper.logWarn(transactionalBufferMetrics, + "Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn()); + transactionalBufferMetrics.incrementScnFreezeCounter(); + } + } else { + stuckScnCounter = 0; } - } else { - stuckScnCounter = 0; } } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java index ca24a0430..e71d7190a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java @@ -68,7 +68,7 @@ public void setTimeDifference(AtomicLong timeDifference) { void calculateLagMetrics(Instant changeTime){ if (changeTime != null) { Instant correctedChangeTime = changeTime.plus(Duration.ofMillis(timeDifference.longValue())); - lagFromTheSource.set(Duration.between(correctedChangeTime, Instant.now())); + lagFromTheSource.set(Duration.between(correctedChangeTime, Instant.now()).abs()); if (maxLagFromTheSource.get().toMillis() < lagFromTheSource.get().toMillis()) { maxLagFromTheSource.set(lagFromTheSource.get());