Skip to content

Commit

Permalink
Merge pull request debezium#40 in N4FRA/debezium from DSCON-187_throw…
Browse files Browse the repository at this point in the history
…s_Cannot_parse_NULL to master

Squashed commit of the following:

commit 2ad78e34664bbdd8e7cb5ad5d701671355bb734b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 18 13:56:38 2020 -0700

    DSCON-187, throws Cannot parse NULL

commit 50d11dd
Merge: 40dd12f 9191658
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:20:37 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 40dd12f
Merge: 0536eab 2075352
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:15:12 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0536eab
Merge: 76f9f80 e30cfbd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:31:51 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 76f9f80
Merge: 77e567e af6f8a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 11 12:33:09 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 77e567e
Merge: 8e3d922 0585b2b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 12:07:58 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8e3d922
Merge: a98bb75 d4bc528
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:26:18 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit a98bb75
Merge: c78c368 a23eb5a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:12:31 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c78c368
Merge: 90bcc19 4619fcd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 06:52:42 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90bcc19
Merge: b5d1ea7 3e3aeea
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 2 14:31:07 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b5d1ea7
Merge: 9686041 51f0dcb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:17:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 92140a3
Merge: 9fa48df 15a6c6c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:14:58 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

commit c3a6023
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 15:56:18 2020 -0800

    DSCON-117, DBConnector exception while incremental loading

commit 90f1823
Merge: 605d22f 8f53bba
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 13:52:09 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 605d22f
Merge: 276c19b e68ada6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Jan 24 07:32:11 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 16 more commits
  • Loading branch information
Ignatenko Andrey committed Mar 18, 2020
1 parent 9191658 commit 6c11363
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public int processResult(ResultSet resultSet) {
LogMinerRowLcr rowLcr = dmlParser.getDmlChange();
LOGGER.trace("parsed record: {}" , rowLcr);
if (rowLcr == null || redo_sql == null) {
LOGGER.error("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
LOGGER.warn("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ public final class TransactionalBuffer {
private final ErrorHandler errorHandler;
private Optional<TransactionalBufferMetrics> metrics;
private final Set<String> abandonedTransactionIds;

// storing rolledBackTransactionIds is for debugging purposes to check what was rolled back to research, todo delete in future releases
private final Set<String> rolledBackTransactionIds;

// It holds the latest captured uncommitted SCN.
// This number tracks starting point for the next mining cycle.
// This gets increased by 1 on each COMMIT or ROLLBACK to avoid reading COMMITTED or ROLLEDBACK transactions on the next mining loop.
private BigDecimal largestScn;

/**
Expand All @@ -78,13 +83,19 @@ public final class TransactionalBuffer {
}

/**
*
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestScn() {
return largestScn;
}

/**
* @return rolled back transactions
*/
public Set<String> getRolledBackTransactionIds() {
return new HashSet<>(rolledBackTransactionIds);
}

/**
* Reset Largest SCN
*/
Expand Down Expand Up @@ -229,6 +240,7 @@ void abandonLongTransactions(Long thresholdScn) {
iter.remove();

calculateLargestScn();
calculateSmallestScn();

metrics.ifPresent(t -> t.addAbandonedTransactionId(transaction.getKey()));
metrics.ifPresent(t -> t.decrementCapturedDmlCounter(transaction.getValue().commitCallbacks.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void testStartLogMinerStatement() {

@Test
public void testBlacklistFiltering() throws Exception {
OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(null);

ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
tables = new Tables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,61 +95,115 @@ public void testIsEmptyWhenTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE));
}

// todo more tests
@Test
public void testNonEmptyFirstTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse();
}

@Test
public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException {
public void testNonEmptySecondTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE));
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue();
}

@Test
public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); // after commit

assertThat(smallestScnContainer.get()).isNull();

assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testCalculateSmallestScnWhenFirstTransactionIsCommitted() throws InterruptedException {
public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);

assertThat(smallestScnContainer.get()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testCalculateSmallestScnWhenSecondTransactionIsCommitted() throws InterruptedException {
public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
// after committing OTHER_TRANSACTION_ID
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE));
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testAbandoningTransaction() {
public void testResetLargestScn() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); // after commit

transactionalBuffer.resetLargestScn(null);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
transactionalBuffer.resetLargestScn(OTHER_SCN.longValue());
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}

@Test
public void testAbandoningOneTransaction() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
}

@Test
public void testAbandoningTransactionHavingAnotherOne() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}

@Test
Expand Down

0 comments on commit 6c11363

Please sign in to comment.