Skip to content

Commit

Permalink
Merge pull request debezium#38 in N4FRA/debezium from DSOPS-101_sessi…
Browse files Browse the repository at this point in the history
…on_loops_after_rollback_1 to master

Squashed commit of the following:

commit 123bcc651acef2a31ab81dc07a337bf996857833
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 13:14:45 2020 -0700

    DSOPS-101, mining session loops after rollback

commit 4343d84e1477fb0ad15730fd48ff587ae49aea2a
Merge: 996c0c0f 0536eab
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 13:13:49 2020 -0700

    Merge branch 'master' into DSOPS-101_session_loops_after_rollback_1

commit 0536eab
Merge: 76f9f80 e30cfbd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:31:51 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 996c0c0ff4c7989eb2469a8c2486de6f80e44484
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:11:16 2020 -0700

    DSOPS-101_session_loops_after_rollback

commit f49ae0bb1e205f7dce13c1ea342262cdd0b57ee8
Merge: 59b65a1b 76f9f80
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:04:47 2020 -0700

    Merge branch 'master' into DSOPS-101_session_loops_after_rollback

commit 76f9f80
Merge: 77e567e af6f8a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 11 12:33:09 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 59b65a1ba27436e75774593d35a76a1379b5e830
Merge: ddd3c186 77e567e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 12:08:39 2020 -0700

    Merge branch 'master' into DSOPS-101_session_loops_after_rollback

commit 77e567e
Merge: 8e3d922 0585b2b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 12:07:58 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit ddd3c1867ca7fd5aea6f0d54c8b431e8bc6648f1
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 11:16:49 2020 -0700

    DSOPS-101, mining session loops after rollback

commit 8e3d922
Merge: a98bb75 d4bc528
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:26:18 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit a98bb75
Merge: c78c368 a23eb5a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:12:31 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c78c368
Merge: 90bcc19 4619fcd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 06:52:42 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90bcc19
Merge: b5d1ea7 3e3aeea
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 2 14:31:07 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b5d1ea7
Merge: 9686041 51f0dcb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:17:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 92140a3
Merge: 9fa48df 15a6c6c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:14:58 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

... and 19 more commits
  • Loading branch information
Ignatenko Andrey committed Mar 16, 2020
1 parent e30cfbd commit 2075352
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(2000);
maxBatchSize.set(5_000);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeLimitToFallAsleep.set(50);

Expand Down Expand Up @@ -161,7 +161,7 @@ public int getFetchedRecordSizeLimitToFallAsleep() {
// MBean accessible setters
@Override
public void setMaxBatchSize(int size) {
if (size >= 100 && size <= 10_000) {
if (size >= 100 && size <= 20_000) {
maxBatchSize.set(size);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public int processResult(ResultSet resultSet) {

// DDL
if (operationCode == RowMapper.DDL) {
LOGGER.debug("DDL: {}, REDO_SQL {}", logMessage, redo_sql);
LOGGER.debug("DDL: {}, REDO_SQL: {}", logMessage, redo_sql);
continue;
// todo parse, add to the collection.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
transactionalBuffer.abandonLongTransactions(nextOldestScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn);
offsetContext.setScn(nextOldestScn);
lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();
updateStartScn();
});

LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
Expand All @@ -181,15 +181,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
metronome.pause();
}

updateStartScn();

// get largest scn from the last uncommitted transaction and set as last processed scn
LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());

lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();

// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(lastProcessedScn);
transactionalBuffer.resetLargestScn();
transactionalBuffer.resetLargestScn(null);
}

res.close();
Expand Down Expand Up @@ -217,6 +217,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}

private void updateStartScn() {
long startScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();
if (startScn == lastProcessedScn) {
transactionalBuffer.resetLargestScn(nextScn);
lastProcessedScn = nextScn;
} else {
lastProcessedScn = startScn;
}
}

@Override
public void commitOffset(Map<String, ?> offset) {
// nothing to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ public final class TransactionalBuffer {
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestScn() {
calculateLargestScn();
return largestScn;
}

/**
* Reset Largest SCNs
* Reset Largest SCN
*/
public void resetLargestScn() {
largestScn = BigDecimal.ZERO;
public void resetLargestScn(Long value) {
if (value != null) {
largestScn = new BigDecimal(value);
} else {
largestScn = BigDecimal.ZERO;
}
}

/**
Expand Down Expand Up @@ -274,10 +277,11 @@ boolean rollback(String transactionId, String debugMessage) {
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions);
metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId));

transactions.remove(transactionId);
return true;
}

transactions.remove(transactionId);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public void testIsEmptyWhenTransactionIsRolledBack() {
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
}

// todo more tests

@Test
public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
Expand Down

0 comments on commit 2075352

Please sign in to comment.