Skip to content

Commit

Permalink
Merge pull request debezium#48 in N4FRA/debezium from DSCON-262_Analy…
Browse files Browse the repository at this point in the history
…se_address_parsing_errors to master

Squashed commit of the following:

commit c4d2db3592eb492890039726eb1ad5889b5dfa1b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:43:43 2020 -0700

    DSCON-262, Analyse and address parsing errors

commit c8db7e91013df5e2baf22ce981cc0ff9577eb4cf
Merge: d29c1e89 b514c5e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:42:37 2020 -0700

    Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors

commit b514c5e
Merge: babd47d 2b016d7
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:41:54 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d29c1e892800143f201bee4f38eaae08e821e0fe
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Apr 28 14:12:27 2020 -0700

    DSCON-241,
    supplemental logging on the table level not in synch

commit cf3d0870d7f3dbd180af2a9455facd4df4224e0e
Merge: 9cabba11 babd47d
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 12:49:59 2020 -0700

    Merge branch 'master' into DSCON-251_create_table_for_flushing_LMWR_buffer

commit babd47d
Merge: e97206f a991266
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 12:49:30 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9cabba1176dd9c4c7c9004034db708fc13220460
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 10:40:02 2020 -0700

    DSCON-251, create audit table as flushing mechanism

commit e97206f
Merge: 815ee19 c8742ae
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Apr 9 14:43:43 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 815ee19
Merge: b8cbd55 828c9bc
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Apr 3 15:24:08 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b8cbd55
Merge: 2480b27 b9bfa67
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 27 17:02:47 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 2480b27
Merge: 43ce5a6 4880c24
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Mar 24 14:57:36 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 43ce5a6
Merge: 89feb14 ed0579e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Mar 24 14:53:19 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 89feb14
Merge: 50d11dd 6c11363
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 18 15:25:50 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 50d11dd
Merge: 40dd12f 9191658
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:20:37 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 40dd12f
Merge: 0536eab 2075352
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:15:12 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0536eab
Merge: 76f9f80 e30cfbd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:31:51 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 76f9f80
Merge: 77e567e af6f8a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 11 12:33:09 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 77e567e
Merge: 8e3d922 0585b2b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 12:07:58 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8e3d922
Merge: a98bb75 d4bc528
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:26:18 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit a98bb75
Merge: c78c368 a23eb5a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:12:31 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 28 more commits
  • Loading branch information
Ignatenko Andrey committed May 4, 2020
1 parent 2b016d7 commit 274e969
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 38 deletions.
Expand Up @@ -80,7 +80,10 @@ public SimpleDmlParser(String catalogName, String schemaName, OracleChangeRecord
*/
public LogMinerRowLcr parse(String dmlContent, Tables tables, String txId){
try {
if (dmlContent == null) {

// If a table contains Spatial data type, DML input generates two entries in REDO LOG.
// First with actual statement and second with NULL. It is not relevant at this point
if (dmlContent == null) {
LOGGER.debug("Cannot parse NULL , transaction: {}", txId);
return null;
}
Expand Down
Expand Up @@ -33,7 +33,7 @@
* On commit it executes all registered callbacks, which dispatch ChangeRecords.
* This also calculates metrics
*/
public class LogMinerQueryResultProcessor {
class LogMinerQueryResultProcessor {

private final ChangeEventSource.ChangeEventSourceContext context;
private final LogMinerMetrics metrics;
Expand All @@ -50,7 +50,7 @@ public class LogMinerQueryResultProcessor {
private long currentOffsetCommitScn = 0;


public LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics,
LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics,
TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser,
OracleOffsetContext offsetContext, OracleDatabaseSchema schema,
EventDispatcher<TableId> dispatcher, TransactionalBufferMetrics transactionalBufferMetrics,
Expand All @@ -72,7 +72,7 @@ public LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext c
* @param resultSet the info from Log Miner view
* @return number of processed DMLs from the given resultSet
*/
public int processResult(ResultSet resultSet) {
int processResult(ResultSet resultSet) {
int dmlCounter = 0;
int commitCounter = 0;
int rollbackCounter = 0;
Expand Down Expand Up @@ -147,11 +147,9 @@ public int processResult(ResultSet resultSet) {
dmlCounter++;
iterationStart = Instant.now();
LogMinerRowLcr rowLcr = dmlParser.parse(redo_sql, schema.getTables(), txId);
// dmlParser.parse(redo_sql, schema.getTables());
cumulativeParseTime = cumulativeParseTime.plus(Duration.between(iterationStart, Instant.now()));
iterationStart = Instant.now();

LOGGER.trace("parsed record: {}" , rowLcr);
if (rowLcr == null || redo_sql == null) {
LOGGER.trace("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
continue;
Expand Down Expand Up @@ -226,7 +224,9 @@ public int processResult(ResultSet resultSet) {
LOGGER.warn("offset SCN {} did not change, the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn());
}
currentOffsetScn = offsetContext.getScn();
currentOffsetCommitScn = offsetContext.getCommitScn();
if (offsetContext.getCommitScn() != null) {
currentOffsetCommitScn = offsetContext.getCommitScn();
}
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks were processed in {} milliseconds, commit time:{}, rollback time: {}, parse time:{}, " +
"other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}",
dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()),
Expand Down
Expand Up @@ -91,10 +91,9 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
* This is the loop to get changes from LogMiner
*
* @param context change event source context
* @throws InterruptedException an exception
*/
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
public void execute(ChangeEventSourceContext context) {
Metronome metronome;

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
Expand Down Expand Up @@ -138,11 +137,10 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);

endScn = LogMinerHelper.getNextScn(connection, startScn, logMinerMetrics);
// this is to let LogWriter to finish it's job
// this is to reduce the DB impact
metronome.pause();

LOGGER.debug("startScn: {}, endScn: {}", startScn, endScn);

LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
String possibleNewCurrentLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);

if (!currentRedoLogFile.equals(possibleNewCurrentLogFile)) {
Expand Down
Expand Up @@ -26,10 +26,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -86,21 +84,21 @@ public final class TransactionalBuffer {
/**
* @return largest last SCN in the buffer among all transactions
*/
public BigDecimal getLargestScn() {
BigDecimal getLargestScn() {
return largestScn;
}

/**
* @return rolled back transactions
*/
public Set<String> getRolledBackTransactionIds() {
Set<String> getRolledBackTransactionIds() {
return new HashSet<>(rolledBackTransactionIds);
}

/**
* Reset Largest SCN
*/
public void resetLargestScn(Long value) {
void resetLargestScn(Long value) {
if (value != null) {
largestScn = new BigDecimal(value);
} else {
Expand Down Expand Up @@ -139,21 +137,6 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
return;
}

if (!transaction.flatRedo.add(redoSql)) {
LOGGER.warn("Ignored duplicated capture as of SCN={}, transaction= {}, REDO_SQL={}", scn, transactionId, redoSql);

// todo delete it after stowplan deletion
if (redoSql.contains("INV_UNIT_FCY_VISIT")){
if (redoSql.contains("insert into ")) {
metrics.ifPresent(m -> m.decrementUfvInsert());
}
if (redoSql.contains("delete from ")) {
metrics.ifPresent(m -> m.decrementUfvDelete());
}
}
return;
}

transaction.commitCallbacks.add(callback);
transaction.addRedoSql(scn, redoSql);
}
Expand Down Expand Up @@ -363,18 +346,15 @@ public interface CommitCallback {
private static final class Transaction {

private final BigDecimal firstScn;
// this is SCN candidate, not actual COMMITTED_SCN
private BigDecimal lastScn;
private final List<CommitCallback> commitCallbacks;
private final NavigableMap<BigDecimal, List<String>> redoSqlMap;
private final Set<String> flatRedo;
private final Map<BigDecimal, List<String>> redoSqlMap;

private Transaction(BigDecimal firstScn) {
this.firstScn = firstScn;
this.commitCallbacks = new ArrayList<>();
this.redoSqlMap = new TreeMap<>();
this.redoSqlMap = new HashMap<>();
this.lastScn = firstScn;
flatRedo = new HashSet();
}

private void addRedoSql(BigDecimal scn, String redoSql) {
Expand All @@ -386,7 +366,6 @@ private void addRedoSql(BigDecimal scn, String redoSql) {
} else {
sqlList.add(redoSql);
}
flatRedo.add(redoSql);
}

@Override
Expand Down

0 comments on commit 274e969

Please sign in to comment.