Skip to content

Commit

Permalink
Merge pull request debezium#28 in N4FRA/debezium from DSCON-148_bapli…
Browse files Browse the repository at this point in the history
…e_mismatch_during_Emulation to master

Squashed commit of the following:

commit e874be686a7fc1bd350b9bf92473bfc2b10871cf
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:00:28 2020 -0800

    DSCON-148, baplie mismatch during Oracle Emulation Testing

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

commit c3a6023
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 15:56:18 2020 -0800

    DSCON-117, DBConnector exception while incremental loading

commit 90f1823
Merge: 605d22f 8f53bba
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 13:52:09 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 605d22f
Merge: 276c19b e68ada6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Jan 24 07:32:11 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 276c19b
Merge: 9b5a3f3 bc8e4be
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 20 14:32:44 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b5a3f3
Merge: 9b0ee98 f9de59c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Jan 15 13:41:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b0ee98
Merge: 7ab9af3 0fe3cf3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:55:19 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7ab9af3
Merge: 90979c1 d8872ce
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:48:18 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90979c1
Merge: d174eab 4086b3e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 13 13:39:26 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d174eab
Merge: 0894de1 7e77a83
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Dec 9 08:58:41 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0894de1
Merge: 8353b85 12021bd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Nov 27 15:12:39 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8353b85
Merge: c6f361f b69f88c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Nov 27 14:57:47 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c6f361f
Merge: 7b499a3 7c9497a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Nov 26 15:27:32 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7b499a3
Merge: ff19baa 5be9ef5
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Nov 22 14:09:47 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit ff19baa
Merge: 7b0b476 0309d35
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Sep 27 11:04:12 2019 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7b0b476
Merge: 5d76700 eaaddfb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Sep 5 15:52:31 2019 -0700

    ARGO-198431, new parser

commit 5d76700
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Sep 5 15:01:49 2019 -0700

    ARGO-198431, new parser

... and 3 more commits
  • Loading branch information
Ignatenko Andrey committed Feb 26, 2020
1 parent 27eb9af commit 15a6c6c
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
super(taskContext, "log-miner");

maxBatchSize.set(2000);
millisecondToSleepBetweenMiningQuery.set(500);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeLimitToFallAsleep.set(50);

currentScn.set(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public int processResult(ResultSet resultSet) {

// Commit
if (operationCode == RowMapper.COMMIT) {
LOGGER.trace("COMMIT, {}", logMessage);
if (transactionalBuffer.commit(txId, changeTime, context, logMessage)){
LOGGER.trace("COMMIT, {}", logMessage);
commitCounter++;
cumulativeCommitTime = cumulativeCommitTime.plus(Duration.between(iterationStart, Instant.now()));
}
Expand All @@ -112,8 +112,8 @@ public int processResult(ResultSet resultSet) {

//Rollback
if (operationCode == RowMapper.ROLLBACK) {
LOGGER.trace("ROLLBACK, {}", logMessage);
if (transactionalBuffer.rollback(txId, logMessage)){
LOGGER.trace("ROLLBACK, {}", logMessage);
rollbackCounter++;
cumulativeRollbackTime = cumulativeRollbackTime.plus(Duration.between(iterationStart, Instant.now()));
}
Expand All @@ -127,6 +127,12 @@ public int processResult(ResultSet resultSet) {
// todo parse, add to the collection.
}

// MISSING_SCN
if (operationCode == RowMapper.MISSING_SCN) {
LOGGER.warn("Missing SCN, {}", logMessage);
continue;
}

// DML
if (operationCode == RowMapper.INSERT || operationCode == RowMapper.DELETE || operationCode == RowMapper.UPDATE) {
LOGGER.trace("DML, {}, sql {}", logMessage, redo_sql);
Expand Down Expand Up @@ -156,6 +162,7 @@ public int processResult(ResultSet resultSet) {
// update SCN in offset context only if processed SCN less than SCN among other transactions
if (smallestScn == null || scn.compareTo(smallestScn) < 0) {
offsetContext.setScn(scn.longValue());
transactionalBufferMetrics.setOldestScn(scn.longValue());
}
offsetContext.setTransactionId(txId);
offsetContext.setSourceTime(timestamp.toInstant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final boolean isContinuousMining;
private long lastProcessedScn;
private long nextScn;
//private long startMiningScn;

public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext,
OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher,
Expand Down Expand Up @@ -94,6 +96,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
Metronome metronome;
final long STEP_BACK = 400L; // todo calculate it, based on committed scn

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
Expand All @@ -103,6 +106,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
queryLogMinerContents(connectorConfig.getSchemaName(), jdbcConnection.username(), schema, SqlUtils.LOGMNR_CONTENTS_VIEW))) {

lastProcessedScn = offsetContext.getScn();
//startMiningScn = offsetContext.getScn();

long oldestScnInOnlineRedo = LogMinerHelper.getFirstOnlineLogScn(connection);
if (lastProcessedScn < oldestScnInOnlineRedo) {
throw new RuntimeException("Online REDO LOG files don't contain the offset SCN. Clean offset and start over");
Expand Down Expand Up @@ -156,15 +161,15 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn);
});

LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn - STEP_BACK);
}

LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);

currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
}

LogMinerHelper.startOnlineMining(connection, lastProcessedScn, nextScn, strategy, isContinuousMining);
LogMinerHelper.startOnlineMining(connection, lastProcessedScn - STEP_BACK, nextScn, strategy, isContinuousMining);

Instant startTime = Instant.now();
fetchFromMiningView.setLong(1, lastProcessedScn);
Expand All @@ -179,11 +184,18 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
metronome.pause();
}

// get largest scn from the last uncommitted transaction and set as last processed scn
LOGGER.trace("largest first scn = {}, largest last scn = {}", transactionalBuffer.getLargestFirstScn(), transactionalBuffer.getLargestLastScn());

lastProcessedScn = transactionalBuffer.getLargestLastScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestLastScn().longValue();

// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(nextScn);
offsetContext.setScn(lastProcessedScn);
transactionalBuffer.resetLargestScns();
}
lastProcessedScn = nextScn;

// startMiningScn = transactionalBuffer.getLargestFirstScn().equals(BigDecimal.ZERO) ? nextScn : transactionalBuffer.getLargestFirstScn().longValue();

res.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class RowMapper {
public static final int UPDATE = 3;
public static final int DDL = 5;
public static final int COMMIT = 7;
public static final int MISSING_SCN = 34;
public static final int ROLLBACK = 36;

private static final int SCN = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,8 @@ static String queryLogMinerContents(String schemaName, String logMinerUser, Orac
" AND OPERATION_CODE in (1,2,3,5) " +// 5 - DDL
" AND SEG_OWNER = '"+ schemaName.toUpperCase() +"' " +
buildTableInPredicate(whiteListTableNames) +
// " (commit_scn >= ? " +
" AND SCN > ? AND SCN <= ? " +
//" OR (OPERATION_CODE IN (7,36) AND USERNAME ='"+schemaName.toUpperCase()+"')";
" OR (OPERATION_CODE IN (7,36) AND USERNAME NOT IN ('SYS','SYSTEM','"+logMinerUser.toUpperCase()+"'))" + sorting; //todo username = schemaName?
" AND SCN >= ? AND SCN <= ? " +
" OR (OPERATION_CODE IN (7,34,36) AND USERNAME NOT IN ('SYS','SYSTEM','"+logMinerUser.toUpperCase()+"'))" + sorting; //todo username = schemaName?
}

/**
Expand Down
Loading

0 comments on commit 15a6c6c

Please sign in to comment.