Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Mar 17, 2020
2 parents 0536eab + 2075352 commit 40dd12f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(2000);
maxBatchSize.set(5_000);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeLimitToFallAsleep.set(50);

Expand Down Expand Up @@ -161,7 +161,7 @@ public int getFetchedRecordSizeLimitToFallAsleep() {
// MBean accessible setters
@Override
public void setMaxBatchSize(int size) {
if (size >= 100 && size <= 10_000) {
if (size >= 100 && size <= 20_000) {
maxBatchSize.set(size);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public int processResult(ResultSet resultSet) {

// DDL
if (operationCode == RowMapper.DDL) {
LOGGER.debug("DDL: {}, REDO_SQL {}", logMessage, redo_sql);
LOGGER.debug("DDL: {}, REDO_SQL: {}", logMessage, redo_sql);
continue;
// todo parse, add to the collection.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
transactionalBuffer.abandonLongTransactions(nextOldestScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn);
offsetContext.setScn(nextOldestScn);
lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();
updateStartScn();
});

LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
Expand All @@ -181,15 +181,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
metronome.pause();
}

updateStartScn();

// get largest scn from the last uncommitted transaction and set as last processed scn
LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());

lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();

// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(lastProcessedScn);
transactionalBuffer.resetLargestScn();
transactionalBuffer.resetLargestScn(null);
}

res.close();
Expand Down Expand Up @@ -217,6 +217,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}

private void updateStartScn() {
long startScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();
if (startScn == lastProcessedScn) {
transactionalBuffer.resetLargestScn(nextScn);
lastProcessedScn = nextScn;
} else {
lastProcessedScn = startScn;
}
}

@Override
public void commitOffset(Map<String, ?> offset) {
// nothing to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ public final class TransactionalBuffer {
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestScn() {
calculateLargestScn();
return largestScn;
}

/**
* Reset Largest SCNs
* Reset Largest SCN
*/
public void resetLargestScn() {
largestScn = BigDecimal.ZERO;
public void resetLargestScn(Long value) {
if (value != null) {
largestScn = new BigDecimal(value);
} else {
largestScn = BigDecimal.ZERO;
}
}

/**
Expand Down Expand Up @@ -274,10 +277,11 @@ boolean rollback(String transactionId, String debugMessage) {
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions);
metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId));

transactions.remove(transactionId);
return true;
}

transactions.remove(transactionId);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public void testIsEmptyWhenTransactionIsRolledBack() {
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
}

// todo more tests

@Test
public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
Expand Down

0 comments on commit 40dd12f

Please sign in to comment.