Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Mar 18, 2020
2 parents 50d11dd + 6c11363 commit 89feb14
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public int processResult(ResultSet resultSet) {
LogMinerRowLcr rowLcr = dmlParser.getDmlChange();
LOGGER.trace("parsed record: {}" , rowLcr);
if (rowLcr == null || redo_sql == null) {
LOGGER.error("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
LOGGER.warn("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ public final class TransactionalBuffer {
private final ErrorHandler errorHandler;
private Optional<TransactionalBufferMetrics> metrics;
private final Set<String> abandonedTransactionIds;

// storing rolledBackTransactionIds is for debugging purposes to check what was rolled back to research, todo delete in future releases
private final Set<String> rolledBackTransactionIds;

// It holds the latest captured uncommitted SCN.
// This number tracks starting point for the next mining cycle.
// This gets increased by 1 on each COMMIT or ROLLBACK to avoid reading COMMITTED or ROLLEDBACK transactions on the next mining loop.
private BigDecimal largestScn;

/**
Expand All @@ -78,13 +83,19 @@ public final class TransactionalBuffer {
}

/**
*
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestScn() {
return largestScn;
}

/**
* @return rolled back transactions
*/
public Set<String> getRolledBackTransactionIds() {
return new HashSet<>(rolledBackTransactionIds);
}

/**
* Reset Largest SCN
*/
Expand Down Expand Up @@ -229,6 +240,7 @@ void abandonLongTransactions(Long thresholdScn) {
iter.remove();

calculateLargestScn();
calculateSmallestScn();

metrics.ifPresent(t -> t.addAbandonedTransactionId(transaction.getKey()));
metrics.ifPresent(t -> t.decrementCapturedDmlCounter(transaction.getValue().commitCallbacks.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void testStartLogMinerStatement() {

@Test
public void testBlacklistFiltering() throws Exception {
OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(null);

ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
tables = new Tables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,61 +95,115 @@ public void testIsEmptyWhenTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE));
}

// todo more tests
@Test
public void testNonEmptyFirstTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse();
}

@Test
public void testCalculateSmallestScnWhenTransactionIsCommitted() throws InterruptedException {
public void testNonEmptySecondTransactionIsRolledBack() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE));
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue();
}

@Test
public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); // after commit

assertThat(smallestScnContainer.get()).isNull();

assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testCalculateSmallestScnWhenFirstTransactionIsCommitted() throws InterruptedException {
public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);

assertThat(smallestScnContainer.get()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testCalculateSmallestScnWhenSecondTransactionIsCommitted() throws InterruptedException {
public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
CountDownLatch commitLatch = new CountDownLatch(1);
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
// after committing OTHER_TRANSACTION_ID
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE));
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}

@Test
public void testAbandoningTransaction() {
public void testResetLargestScn() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); // after commit

transactionalBuffer.resetLargestScn(null);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
transactionalBuffer.resetLargestScn(OTHER_SCN.longValue());
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}

@Test
public void testAbandoningOneTransaction() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
}

@Test
public void testAbandoningTransactionHavingAnotherOne() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}

@Test
Expand Down

0 comments on commit 89feb14

Please sign in to comment.