Skip to content

Commit

Permalink
Merge pull request debezium#31 in N4FRA/debezium from DSCON-148_bapli…
Browse files Browse the repository at this point in the history
…e_mismatch_during_Emulation to master

Squashed commit of the following:

commit 92e013a859e04691d9259c19a6f97c8a975b3f3a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:14:46 2020 -0800

    DSCON-148, baplie mismatch during Oracle Emulation Testing

commit 53b9400bd7f117156a5aa44cce4849ba234ff19d
Merge: b23131d7 9686041
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:13:31 2020 -0800

    Merge branch 'master' into DSCON-148_baplie_mismatch_during_Emulation

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b23131d7b6b5b617de69d69d027dfe85ca9beb25
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:46:43 2020 -0800

    DSCON-148, baplie mismatch during Oracle Emulation Testing

commit d593ecf4130ca251baea8ebd65e0f90100602229
Merge: e874be68 92140a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:15:32 2020 -0800

    Merge branch 'master' into DSCON-148_baplie_mismatch_during_Emulation

commit 92140a3
Merge: 9fa48df 15a6c6c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:14:58 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit e874be686a7fc1bd350b9bf92473bfc2b10871cf
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:00:28 2020 -0800

    DSCON-148, baplie mismatch during Oracle Emulation Testing

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

commit c3a6023
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 15:56:18 2020 -0800

    DSCON-117, DBConnector exception while incremental loading

commit 90f1823
Merge: 605d22f 8f53bba
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 13:52:09 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 605d22f
Merge: 276c19b e68ada6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Jan 24 07:32:11 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 276c19b
Merge: 9b5a3f3 bc8e4be
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 20 14:32:44 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b5a3f3
Merge: 9b0ee98 f9de59c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Jan 15 13:41:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b0ee98
Merge: 7ab9af3 0fe3cf3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:55:19 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7ab9af3
Merge: 90979c1 d8872ce
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:48:18 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90979c1
Merge: d174eab 4086b3e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 13 13:39:26 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d174eab
Merge: 0894de1 7e77a83
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Dec 9 08:58:41 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 10 more commits
  • Loading branch information
Ignatenko Andrey committed Feb 27, 2020
1 parent 4996a49 commit 51f0dcb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
// private final OracleDmlParser dmlParser;
private final SimpleDmlParser dmlParser;
private final String catalogName;
//private final int posVersion;
private OracleConnectorConfig connectorConfig;
private final TransactionalBufferMetrics transactionalBufferMetrics;
private final LogMinerMetrics logMinerMetrics;
private final OracleConnectorConfig.LogMiningStrategy strategy;
private final boolean isContinuousMining;
private long lastProcessedScn;
private long nextScn;
//private long startMiningScn;

public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext,
OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher,
Expand Down Expand Up @@ -96,7 +94,6 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
Metronome metronome;
final long STEP_BACK = 400L; // todo calculate it, based on committed scn

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
Expand All @@ -106,7 +103,6 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
queryLogMinerContents(connectorConfig.getSchemaName(), jdbcConnection.username(), schema, SqlUtils.LOGMNR_CONTENTS_VIEW))) {

lastProcessedScn = offsetContext.getScn();
//startMiningScn = offsetContext.getScn();

long oldestScnInOnlineRedo = LogMinerHelper.getFirstOnlineLogScn(connection);
if (lastProcessedScn < oldestScnInOnlineRedo) {
Expand Down Expand Up @@ -161,15 +157,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn);
});

LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn - STEP_BACK);
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
}

LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);

currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
}

LogMinerHelper.startOnlineMining(connection, lastProcessedScn - STEP_BACK, nextScn, strategy, isContinuousMining);
LogMinerHelper.startOnlineMining(connection, lastProcessedScn, nextScn, strategy, isContinuousMining);

Instant startTime = Instant.now();
fetchFromMiningView.setLong(1, lastProcessedScn);
Expand All @@ -185,19 +181,21 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}

// get largest scn from the last uncommitted transaction and set as last processed scn
LOGGER.trace("largest first scn = {}, largest last scn = {}", transactionalBuffer.getLargestFirstScn(), transactionalBuffer.getLargestLastScn());
LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());

lastProcessedScn = transactionalBuffer.getLargestLastScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestLastScn().longValue();
lastProcessedScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestScn().longValue();

// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(lastProcessedScn);
transactionalBuffer.resetLargestScns();
transactionalBuffer.resetLargestScn();
}

// startMiningScn = transactionalBuffer.getLargestFirstScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestFirstScn().longValue();

res.close();
// we don't do it for other modes to save time on building data dictionary
// if (strategy == OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) {
// LogMinerHelper.endMining(connection);
// }
}
} catch (Throwable e) {
if (connectionProblem(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,10 @@ static String queryLogMinerContents(String schemaName, String logMinerUser, Orac
" FROM " + miningViewName +
" WHERE " +
// currently we do not capture changes from other schemas
" USERNAME = '"+ schemaName.toUpperCase() +"'" +
" AND OPERATION_CODE in (1,2,3,5) " +// 5 - DDL
" OPERATION_CODE in (1,2,3,5) " +// 5 - DDL
" AND SEG_OWNER = '"+ schemaName.toUpperCase() +"' " +
buildTableInPredicate(whiteListTableNames) +
" AND SCN >= ? AND SCN <= ? " +
" AND SCN > ? AND SCN <= ? " +
" OR (OPERATION_CODE IN (7,34,36) AND USERNAME NOT IN ('SYS','SYSTEM','"+logMinerUser.toUpperCase()+"'))" + sorting; //todo username = schemaName?
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -48,8 +47,7 @@ public final class TransactionalBuffer {
private final ErrorHandler errorHandler;
private Optional<TransactionalBufferMetrics> metrics;
private final Set<String> abandonedTransactionIds;
private BigDecimal largestFirstScn;
private BigDecimal largestLastScn;
private BigDecimal largestScn;

/**
* Constructor to create a new instance.
Expand All @@ -68,34 +66,23 @@ public final class TransactionalBuffer {
} else {
this.metrics = Optional.empty();
}
largestFirstScn = BigDecimal.ZERO;
largestLastScn = BigDecimal.ZERO;
largestScn = BigDecimal.ZERO;
this.abandonedTransactionIds = new HashSet<>();
}

/**
* the largest SCN in entire buffer
*
* @return largest first SCN in the buffer among all transactions
*/
public BigDecimal getLargestFirstScn() {
return largestFirstScn;
}

/**
*
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestLastScn() {
return largestLastScn;
public BigDecimal getLargestScn() {
return largestScn;
}

/**
* Reset Largest SCNs
*/
public void resetLargestScns() {
largestLastScn = BigDecimal.ZERO;
largestFirstScn = BigDecimal.ZERO;
public void resetLargestScn() {
largestScn = BigDecimal.ZERO;
}

/**
Expand Down Expand Up @@ -123,19 +110,12 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
// Another way to do it would be storing Map of SCN and Tuple of RAW_ID and TABLE_NAME as unique identifier of a DML
Transaction transaction = transactions.get(transactionId);
if (transaction != null) {
if (transaction.lastScn != null && transaction.lastScn.equals(scn) && transaction.redoSqlMap.get(scn) != null) {
List<String> redoSqls = transaction.redoSqlMap.get(scn);
if (redoSqls.contains(redoSql)) {
LOGGER.debug("Ignored duplicated capture as of SCN={}, REDO_SQL={}", scn, redoSql);
return;
}
}
transaction.commitCallbacks.add(callback);
transaction.addRedoSql(scn, redoSql);
}

if (scn.compareTo(largestLastScn) > 0) {
largestLastScn = scn;
if (scn.compareTo(largestScn) > 0) {
largestScn = scn;
}
}

Expand All @@ -152,8 +132,7 @@ boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.Chan
Transaction transaction = transactions.get(transactionId);
if (transaction != null) {
transaction.lastScn = transaction.lastScn.add(BigDecimal.ONE);
calculateLargestFirstScn();
calculateLargestLastScn();
calculateLargestScn();
}

transaction = transactions.remove(transactionId);
Expand Down Expand Up @@ -217,8 +196,7 @@ void abandonLongTransactions(Long thresholdScn) {
abandonedTransactionIds.add(transaction.getKey());
iter.remove();

calculateLargestFirstScn();
calculateLargestLastScn();
calculateLargestScn();

metrics.ifPresent(t -> t.addAbandonedTransactionId(transaction.getKey()));
metrics.ifPresent(t -> t.decrementCapturedDmlCounter(transaction.getValue().commitCallbacks.size()));
Expand All @@ -237,22 +215,12 @@ private BigDecimal calculateSmallestScn() {
return scn;
}

private void calculateLargestFirstScn() {
largestFirstScn = transactions.isEmpty() ? BigDecimal.ZERO : transactions.values()
.stream()
.map(transaction -> transaction.firstScn)
.max(BigDecimal::compareTo)
.orElseThrow(() -> new DataException("Cannot calculate largest first SCN"));
// metrics.ifPresent(m -> m.setOldestScn(scn == null ? -1 : scn.longValue()));
}

private void calculateLargestLastScn() {
largestLastScn = transactions.isEmpty() ? BigDecimal.ZERO : transactions.values()
private void calculateLargestScn() {
largestScn = transactions.isEmpty() ? BigDecimal.ZERO : transactions.values()
.stream()
.map(transaction -> transaction.lastScn)
.max(BigDecimal::compareTo)
.orElseThrow(() -> new DataException("Cannot calculate largest last SCN"));
// metrics.ifPresent(m -> m.setOldestScn(scn == null ? -1 : scn.longValue()));
.orElseThrow(() -> new DataException("Cannot calculate largest SCN"));
}

/**
Expand Down Expand Up @@ -327,32 +295,26 @@ private static final class Transaction {
// this is SCN candidate, not actual COMMITTED_SCN
private BigDecimal lastScn;
private final List<CommitCallback> commitCallbacks;
private final Map<BigDecimal, List<String>> redoSqlMap;
private final List<String> redoSqls; // TODO delete after debugging

private Transaction(BigDecimal firstScn) {
this.firstScn = firstScn;
this.commitCallbacks = new ArrayList<>();
this.redoSqlMap = new HashMap<>();
this.redoSqls = new ArrayList<>();
this.lastScn = firstScn;
}

private void addRedoSql(BigDecimal scn, String redoSql) {
this.lastScn = scn;

List<String> sqlList = redoSqlMap.get(scn);
if (sqlList == null) {
redoSqlMap.put(scn, new ArrayList<>(Collections.singletonList(redoSql)));
} else {
sqlList.add(redoSql);
}
this.redoSqls.add(redoSql);
}

@Override
public String toString() {
return "Transaction{" +
"firstScn=" + firstScn +
", lastScn=" + lastScn +
", redoSqls=" + Arrays.toString(redoSqlMap.values().toArray()) +
", redoSqls=" + Arrays.toString(redoSqls.toArray()) +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,39 +158,4 @@ public void testTransactionDump() {
assertThat(transactionalBuffer.toString()).contains(SQL_ONE);
assertThat(transactionalBuffer.toString()).contains(SQL_TWO);
}

@Test
public void testDuplicatedRedoSql() {

assertThat(transactionalBuffer.getLargestFirstScn().equals(BigDecimal.ZERO));

final String insertIntoATable = "insert into a table";
final String anotherInsertIntoATable = "another insert into a table";
final String duplicatedInsertIntoATable = "duplicated insert into a table";

transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> { });
assertThat(transactionalBuffer.getLargestFirstScn().equals(OTHER_SCN));
assertThat(transactionalBuffer.toString().contains(insertIntoATable));
assertThat(transactionalBuffer.toString().contains(duplicatedInsertIntoATable));

transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, LARGEST_SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn) -> { });
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> { });
assertThat(transactionalBuffer.getLargestFirstScn().equals(LARGEST_SCN));
assertThat(transactionalBuffer.toString().contains(insertIntoATable));
assertThat(transactionalBuffer.toString().contains(duplicatedInsertIntoATable));
// make sure the duplications are OK in different transactions
assertThat(transactionalBuffer.toString().indexOf(duplicatedInsertIntoATable) != transactionalBuffer.toString().lastIndexOf(duplicatedInsertIntoATable));

transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), anotherInsertIntoATable, (timestamp, smallestScn) -> { });
assertThat(transactionalBuffer.toString().contains(anotherInsertIntoATable));
transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");

// try to duplicate sql for the same transaction for the same SCN
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> { });

assertThat(transactionalBuffer.toString().indexOf(duplicatedInsertIntoATable) == transactionalBuffer.toString().lastIndexOf(duplicatedInsertIntoATable));
assertThat(transactionalBuffer.getLargestFirstScn().equals(LARGEST_SCN));
}

}

0 comments on commit 51f0dcb

Please sign in to comment.