Skip to content

Commit

Permalink
Merge pull request debezium#42 in N4FRA/debezium from DSCON-175_persi…
Browse files Browse the repository at this point in the history
…st_transactional_buffer to master

Squashed commit of the following:

commit 374c32f8a8a11af620605ff93affe57c5a661eb9
Merge: a6fd82c9 43ce5a6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Mar 24 14:53:56 2020 -0700

    Merge branch 'master' into DSCON-175_persist_transactional_buffer

commit 43ce5a6
Merge: 89feb14 ed0579e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Mar 24 14:53:19 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit a6fd82c9e1eb6ac4a60122103be8e07cde3401cf
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Mar 24 14:48:41 2020 -0700

    DSCON-175, persist transactional buffer

commit 88a7202de2c2035c9ca4e81ce7f91c3bb3010aa9
Merge: 41151847 89feb14
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 18 15:29:10 2020 -0700

    Merge branch 'master' into DSCON-125_column_blacklist_Oracle

commit 89feb14
Merge: 50d11dd 6c11363
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 18 15:25:50 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 50d11dd
Merge: 40dd12f 9191658
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:20:37 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 40dd12f
Merge: 0536eab 2075352
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 16 17:15:12 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0536eab
Merge: 76f9f80 e30cfbd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Mar 12 13:31:51 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 76f9f80
Merge: 77e567e af6f8a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Mar 11 12:33:09 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 77e567e
Merge: 8e3d922 0585b2b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 12:07:58 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8e3d922
Merge: a98bb75 d4bc528
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:26:18 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 41151847286d2e0c1b6979dbde1420f0221a37cc
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:18:39 2020 -0800

    DSCON-125, make column blacklist working for Oracle

commit badff6d5dec76671afd4d9bd5568f1155a58cbb6
Merge: 2db5385f a98bb75
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:18:19 2020 -0800

    Merge branch 'master' into DSCON-125_column_blacklist_Oracle

commit a98bb75
Merge: c78c368 a23eb5a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:12:31 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 2db5385f4bb688976a77b7ef6f6f8a401515216d
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 18:01:17 2020 -0800

    DSCON-125, make column blacklist working for Oracle

commit c78c368
Merge: 90bcc19 4619fcd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 06:52:42 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90bcc19
Merge: b5d1ea7 3e3aeea
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 2 14:31:07 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b5d1ea7
Merge: 9686041 51f0dcb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:17:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 23 more commits
  • Loading branch information
Ignatenko Andrey committed Mar 24, 2020
1 parent ed0579e commit 4880c24
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class OracleOffsetContext implements OffsetContext {
*/
private boolean snapshotCompleted;

public OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Long commitScn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) {
this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted);
sourceInfo.setCommitScn(commitScn);
}

private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());

Expand Down Expand Up @@ -112,7 +117,12 @@ public static Builder create() {
if (sourceInfo.getLcrPosition() != null) {
return Collections.singletonMap(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
}
return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn());
Map<String, Object> offset = new HashMap<>();

offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn());

return offset;
}
}

Expand All @@ -130,10 +140,18 @@ public void setScn(long scn) {
sourceInfo.setScn(scn);
}

public void setCommitScn(Long commitScn) {
sourceInfo.setCommitScn(commitScn);
}

public long getScn() {
return sourceInfo.getScn();
}

public Long getCommitScn() {
return sourceInfo.getCommitScn();
}

public void setLcrPosition(LcrPosition lcrPosition) {
sourceInfo.setLcrPosition(lcrPosition);
}
Expand Down Expand Up @@ -216,7 +234,8 @@ public OffsetContext load(Map<String, ?> offset) {
Long scn;
if (adapter == OracleConnectorConfig.ConnectorAdapter.LOG_MINER){
scn = (Long) offset.get(SourceInfo.SCN_KEY);
return new OracleOffsetContext(connectorConfig, scn, null, snapshot, snapshotCompleted);
Long commitScn = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted);
} else {
LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY));
scn = lcrPosition != null ? lcrPosition.getScn() : (Long) offset.get(SourceInfo.SCN_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public static String buildSelectColumns(String blackListColumnStr, Table table)
}
return sb.toString();
}).collect(Collectors.joining(","));
// todo this is an unnecessary code, fix unit test, then remove it
String catalog = table.id().catalog();
List<String> blackList = new ArrayList<>(Arrays.asList(blackListColumnStr.trim().toUpperCase().replaceAll(catalog + ".", "").split(",")));
List<String> allColumns = new ArrayList<>(Arrays.asList(allTableColumns.toUpperCase().split(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public OracleSourceInfoStructMaker(String connector, String version, CommonConne
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.COMMIT_SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
Expand All @@ -43,6 +44,9 @@ public Struct struct(SourceInfo sourceInfo) {
if (sourceInfo.getLcrPosition() != null) {
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
}
if(sourceInfo.getCommitScn() != null) {
ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn());
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ public class SourceInfo extends BaseSourceInfo {

public static final String TXID_KEY = "txId";
public static final String SCN_KEY = "scn";
public static final String COMMIT_SCN_KEY = "commit_scn";
public static final String LCR_POSITION_KEY = "lcr_position";
public static final String SNAPSHOT_KEY = "snapshot";

private long scn;
private Long commitScn;
private LcrPosition lcrPosition;
private String transactionId;
private Instant sourceTime;
Expand All @@ -34,10 +36,18 @@ public long getScn() {
return scn;
}

public Long getCommitScn() {
return commitScn;
}

public void setScn(long scn) {
this.scn = scn;
}

public void setCommitScn(Long commitScn) {
this.commitScn = commitScn;
}

public LcrPosition getLcrPosition() {
return lcrPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public SimpleDmlParser(String catalogName, String schemaName, OracleChangeRecord
public void parse(String dmlContent, Tables tables, String txId){
try {
if (dmlContent == null) {
LOGGER.error("Cannot parse NULL , transaction: {}", txId);
LOGGER.warn("Cannot parse NULL , transaction: {}", txId);
rowLCR = null;
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public int processResult(ResultSet resultSet) {
Timestamp changeTime = RowMapper.getChangeTime(resultSet);
String txId = RowMapper.getTransactionId(resultSet);

String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s",
txId, scn, tableName, segOwner, operationCode, offsetContext.getScn());
String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s, " +
" commitOffsetSCN= %s", txId, scn, tableName, segOwner, operationCode, offsetContext.getScn(), offsetContext.getCommitScn());

if (scn == null) {
LOGGER.warn("Scn is null for {}", logMessage);
Expand All @@ -108,7 +108,7 @@ public int processResult(ResultSet resultSet) {

// Commit
if (operationCode == RowMapper.COMMIT) {
if (transactionalBuffer.commit(txId, changeTime, context, logMessage)){
if (transactionalBuffer.commit(txId, scn, offsetContext, changeTime, context, logMessage)){
LOGGER.trace("COMMIT, {}", logMessage);
commitCounter++;
cumulativeCommitTime = cumulativeCommitTime.plus(Duration.between(iterationStart, Instant.now()));
Expand Down Expand Up @@ -174,7 +174,17 @@ public int processResult(ResultSet resultSet) {
try {
TableId tableId = RowMapper.getTableId(catalogName, resultSet);

transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn) -> {
// todo delete after stowplan confirmation
if ("inv_unit_fcy_visit".equalsIgnoreCase(tableName)) {
if (operationCode == 1) {
transactionalBufferMetrics.incrementUfvInsert();
}
if (operationCode == 2) {
transactionalBufferMetrics.incrementUfvDelete();
}
}

transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn, commitScn, counter) -> {
// update SCN in offset context only if processed SCN less than SCN among other transactions
if (smallestScn == null || scn.compareTo(smallestScn) < 0) {
offsetContext.setScn(scn.longValue());
Expand All @@ -183,6 +193,9 @@ public int processResult(ResultSet resultSet) {
offsetContext.setTransactionId(txId);
offsetContext.setSourceTime(timestamp.toInstant());
offsetContext.setTableId(tableId);
if (counter == 0){
offsetContext.setCommitScn(commitScn.longValue());
}
Table table = schema.tableFor(tableId);
LOGGER.trace("Processing DML event {} scn {}", rowLcr.toString(), scn);

Expand All @@ -201,11 +214,12 @@ public int processResult(ResultSet resultSet) {
metrics.setCapturedDmlCount(dmlCounter);
if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) {
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks were processed in {} milliseconds, commit time:{}, rollback time: {}, parse time:{}, " +
"other time:{}, lag:{}, offset:{}",
"other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}",
dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()),
cumulativeCommitTime.toMillis(), cumulativeRollbackTime.toMillis(),
cumulativeParseTime.toMillis(), cumulativeOtherTime.toMillis(),
transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn());
transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(),
offsetContext.getCommitScn(), transactionalBufferMetrics.getNumberOfActiveTransactions());
}
return dmlCounter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Threads;
Expand Down Expand Up @@ -144,14 +145,6 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
return;
}

// BigDecimal previousScn = transaction.redoSqlMap.floorKey(scn);
// if (previousScn != null) {
// if (transaction.redoSqlMap.get(previousScn) != null && transaction.redoSqlMap.get(previousScn).contains(redoSql)) {
// LOGGER.debug("Ignored duplicated capture for the previous SCN={}, REDO_SQL={}", scn, redoSql);
// return;
// }
// }

transaction.commitCallbacks.add(callback);
transaction.addRedoSql(scn, redoSql);
}
Expand All @@ -163,48 +156,60 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change

/**
* @param transactionId transaction identifier
* @param commitScn SCN of the commit
* @param offsetContext Oracle offset
* @param timestamp commit timestamp
* @param context context to check that source is running
* @param debugMessage todo delete
* @return true if committed transaction is in the buffer
* @return true if committed transaction is in the buffer and was not processed already
*/
boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.ChangeEventSourceContext context, String debugMessage) {
boolean commit(String transactionId, BigDecimal commitScn, OracleOffsetContext offsetContext, Timestamp timestamp,
ChangeEventSource.ChangeEventSourceContext context, String debugMessage) {

Transaction transaction = transactions.get(transactionId);
if (transaction != null) {
transaction.lastScn = transaction.lastScn.add(BigDecimal.ONE);
calculateLargestScn();
if (transaction == null) {
return false;
}

transaction.lastScn = transaction.lastScn.add(BigDecimal.ONE);
calculateLargestScn();

transaction = transactions.remove(transactionId);
BigDecimal smallestScn = calculateSmallestScn();

if (transaction == null) {
return false;
}
taskCounter.incrementAndGet();
abandonedTransactionIds.remove(transactionId);

if (offsetContext.getCommitScn() != null && offsetContext.getCommitScn() >= commitScn.longValue()) {
LOGGER.info("Transaction {} was already processed. Committed SCN in offset is {}, commit SCN of the transaction is {}",
transactionId, offsetContext.getCommitScn(), commitScn);
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
return false;
}

List<CommitCallback> commitCallbacks = transaction.commitCallbacks;
LOGGER.trace("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn);
LOGGER.debug("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn);
executor.execute(() -> {
try {
int counter = commitCallbacks.size();
for (CommitCallback callback : commitCallbacks) {
if (!context.isRunning()) {
return;
}
callback.execute(timestamp, smallestScn);
metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions);
callback.execute(timestamp, smallestScn, commitScn, --counter);
}

metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions);
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size()));
metrics.ifPresent(m -> m.setCommittedScn(commitScn.longValue()));
} catch (InterruptedException e) {
LOGGER.error("Thread interrupted during running", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
errorHandler.setProducerThrowable(e);
} finally {
taskCounter.decrementAndGet();
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size()));
}
});

Expand Down Expand Up @@ -340,8 +345,10 @@ public interface CommitCallback {
*
* @param timestamp commit timestamp
* @param smallestScn smallest SCN among other transactions
* @param commitScn commit SCN
* @param callbackNumber number of the callback in the transaction
*/
void execute(Timestamp timestamp, BigDecimal smallestScn) throws InterruptedException;
void execute(Timestamp timestamp, BigDecimal smallestScn, BigDecimal commitScn, int callbackNumber) throws InterruptedException;
}

@NotThreadSafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@ThreadSafe
public class TransactionalBufferMetrics extends Metrics implements TransactionalBufferMetricsMXBean {
private AtomicLong oldestScn = new AtomicLong();
private AtomicLong committedScn = new AtomicLong();
private AtomicReference<Duration> lagFromTheSource = new AtomicReference<>();
private AtomicInteger activeTransactions = new AtomicInteger();
private AtomicLong rolledBackTransactions = new AtomicLong();
Expand All @@ -37,10 +38,32 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
private Instant startTime;
private static long MILLIS_PER_SECOND = 1000L;


// temp todo delete after stowplan testing
private Long ufvDelete = 0L;
private Long ufvInsert = 0L;

public Long getUfvDelete() {
return ufvDelete;
}

public void incrementUfvDelete() {
this.ufvDelete++;
}

public Long getUfvInsert() {
return ufvInsert;
}

public void incrementUfvInsert() {
this.ufvInsert++;
}

TransactionalBufferMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner-transactional-buffer");
startTime = Instant.now();
oldestScn.set(-1);
committedScn.set(-1);
lagFromTheSource.set(Duration.ZERO);
reset();
}
Expand All @@ -50,6 +73,10 @@ void setOldestScn(Long scn){
oldestScn.set(scn);
}

public void setCommittedScn(Long scn){
committedScn.set(scn);
}

// todo deal with timezones
void setLagFromTheSource(Instant changeTime){
if (changeTime != null) {
Expand Down Expand Up @@ -108,6 +135,11 @@ public Long getOldestScn() {
return oldestScn.get();
}

@Override
public Long getCommittedScn() {
return committedScn.get();
}

@Override
public int getNumberOfActiveTransactions() {
return activeTransactions.get();
Expand Down Expand Up @@ -182,6 +214,7 @@ public void reset() {
public String toString() {
return "TransactionalBufferMetrics{" +
"oldestScn=" + oldestScn.get() +
", committedScn=" + committedScn.get() +
", lagFromTheSource=" + lagFromTheSource.get() +
", activeTransactions=" + activeTransactions.get() +
", rolledBackTransactions=" + rolledBackTransactions.get() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public interface TransactionalBufferMetricsMXBean {
*/
Long getOldestScn();

/**
* It shows last committed SCN
* @return committed SCN
*/
Long getCommittedScn();

/**
* This is to get the lag between latest captured change timestamp in REDO LOG and time of it's placement in the buffer
* @return lag in milliseconds
Expand Down Expand Up @@ -95,4 +101,10 @@ public interface TransactionalBufferMetricsMXBean {
* action to reset some metrics
*/
void reset();

// todo delete after stowplan test
Long getUfvDelete();
void incrementUfvDelete();
Long getUfvInsert();
void incrementUfvInsert();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void schemaIsCorrect() {
.field("table", Schema.STRING_SCHEMA)
.field("txId", Schema.OPTIONAL_STRING_SCHEMA)
.field("scn", Schema.OPTIONAL_INT64_SCHEMA)
.field("commit_scn", Schema.OPTIONAL_INT64_SCHEMA)
.field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA)
.build();

Expand Down

0 comments on commit 4880c24

Please sign in to comment.