Skip to content

Commit

Permalink
Merge pull request #35 in N4FRA/debezium from DSOPS-101_session_loops…
Browse files Browse the repository at this point in the history
…_after_rollback to master

Squashed commit of the following:

commit ddd3c1867ca7fd5aea6f0d54c8b431e8bc6648f1
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 9 11:16:49 2020 -0700

    DSOPS-101, mining session loops after rollback

commit 8e3d922
Merge: a98bb75 d4bc528
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:26:18 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit a98bb75
Merge: c78c368 a23eb5a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Sat Mar 7 04:12:31 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c78c368
Merge: 90bcc19 4619fcd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 06:52:42 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90bcc19
Merge: b5d1ea7 3e3aeea
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 2 14:31:07 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b5d1ea7
Merge: 9686041 51f0dcb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:17:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 92140a3
Merge: 9fa48df 15a6c6c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:14:58 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

commit c3a6023
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 15:56:18 2020 -0800

    DSCON-117, DBConnector exception while incremental loading

commit 90f1823
Merge: 605d22f 8f53bba
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 13:52:09 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 605d22f
Merge: 276c19b e68ada6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Jan 24 07:32:11 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 276c19b
Merge: 9b5a3f3 bc8e4be
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 20 14:32:44 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b5a3f3
Merge: 9b0ee98 f9de59c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Jan 15 13:41:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b0ee98
Merge: 7ab9af3 0fe3cf3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:55:19 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7ab9af3
Merge: 90979c1 d8872ce
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:48:18 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90979c1
Merge: d174eab 4086b3e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 13 13:39:26 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 11 more commits
  • Loading branch information
Ignatenko Andrey committed Mar 9, 2020
1 parent d4bc528 commit 0585b2b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class TransactionalBuffer {
private final ErrorHandler errorHandler;
private Optional<TransactionalBufferMetrics> metrics;
private final Set<String> abandonedTransactionIds;
// storing rolledBackTransactionIds is for debugging purposes to check what was rolled back to research, todo delete in future releases
private final Set<String> rolledBackTransactionIds;
private BigDecimal largestScn;

/**
Expand All @@ -72,6 +74,7 @@ public final class TransactionalBuffer {
}
largestScn = BigDecimal.ZERO;
this.abandonedTransactionIds = new HashSet<>();
this.rolledBackTransactionIds = new HashSet<>();
}

/**
Expand Down Expand Up @@ -114,26 +117,25 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
Transaction transaction = transactions.get(transactionId);
if (transaction != null) {

// todo this should never happen, delete when tested and confirmed
if (rolledBackTransactionIds.contains(transactionId)) {
LOGGER.debug("Ignore DML for rolled back transaction: SCN={}, REDO_SQL={}", scn, redoSql);
return;
}

List<String> redoSqls = transaction.redoSqlMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
if (redoSqls.contains(redoSql)) {
LOGGER.debug("Ignored duplicated capture as of SCN={}, REDO_SQL={}", scn, redoSql);
return;
}

/*
if (transaction.redoSqlMap.get(scn) != null && transaction.redoSqlMap.get(scn).contains(redoSql)) {
LOGGER.trace("Ignored duplicated capture as of SCN={}, REDO_SQL={}", scn, redoSql);
return;
}
BigDecimal previousScn = transaction.redoSqlMap.floorKey(scn);
if (previousScn != null) {
if (transaction.redoSqlMap.get(previousScn) != null && transaction.redoSqlMap.get(previousScn).contains(redoSql)) {
LOGGER.debug("Ignored duplicated capture for the previous SCN={}, REDO_SQL={}", scn, redoSql);
return;
}
}
*/
// BigDecimal previousScn = transaction.redoSqlMap.floorKey(scn);
// if (previousScn != null) {
// if (transaction.redoSqlMap.get(previousScn) != null && transaction.redoSqlMap.get(previousScn).contains(redoSql)) {
// LOGGER.debug("Ignored duplicated capture for the previous SCN={}, REDO_SQL={}", scn, redoSql);
// return;
// }
// }

transaction.commitCallbacks.add(callback);
transaction.addRedoSql(scn, redoSql);
Expand Down Expand Up @@ -260,9 +262,15 @@ boolean rollback(String transactionId, String debugMessage) {
if (transaction != null) {
LOGGER.debug("Transaction {} rolled back", transactionId, debugMessage);
abandonedTransactionIds.remove(transactionId);
rolledBackTransactionIds.add(transactionId);

metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions);
metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId));

calculateSmallestScn();
calculateLargestScn();

return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
private AtomicReference<Duration> minLagFromTheSource = new AtomicReference<>();
private AtomicReference<Duration> totalLagsFromTheSource = new AtomicReference<>();
private AtomicReference<Set<String>> abandonedTransactionIds = new AtomicReference<>();
private AtomicReference<Set<String>> rolledBackTransactionIds = new AtomicReference<>();
private Instant startTime;
private static long MILLIS_PER_SECOND = 1000L;

Expand All @@ -41,16 +42,7 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
startTime = Instant.now();
oldestScn.set(-1);
lagFromTheSource.set(Duration.ZERO);
maxLagFromTheSource.set(Duration.ZERO);
minLagFromTheSource.set(Duration.ZERO);
totalLagsFromTheSource.set(Duration.ZERO);
activeTransactions.set(0);
rolledBackTransactions.set(0);
committedTransactions.set(0);
capturedDmlCounter.set(0);
committedDmlCounter.set(0);
totalLagsFromTheSource.set(Duration.ZERO);
abandonedTransactionIds.set(new HashSet<>());
reset();
}

// setters
Expand Down Expand Up @@ -104,6 +96,12 @@ void addAbandonedTransactionId(String transactionId){
}
}

void addRolledBackTransactionId(String transactionId){
if (transactionId != null) {
rolledBackTransactionIds.get().add(transactionId);
}
}

// implemented getters
@Override
public Long getOldestScn() {
Expand Down Expand Up @@ -160,6 +158,26 @@ public Set<String> getAbandonedTransactionIds() {
return abandonedTransactionIds.get();
}

@Override
public Set<String> getRolledBackTransactionIds() {
return rolledBackTransactionIds.get();
}

@Override
public void reset() {
maxLagFromTheSource.set(Duration.ZERO);
minLagFromTheSource.set(Duration.ZERO);
totalLagsFromTheSource.set(Duration.ZERO);
activeTransactions.set(0);
rolledBackTransactions.set(0);
committedTransactions.set(0);
capturedDmlCounter.set(0);
committedDmlCounter.set(0);
totalLagsFromTheSource.set(Duration.ZERO);
abandonedTransactionIds.set(new HashSet<>());
rolledBackTransactionIds.set(new HashSet<>());
}

@Override
public String toString() {
return "TransactionalBufferMetrics{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,14 @@ public interface TransactionalBufferMetricsMXBean {
*/
Set<String> getAbandonedTransactionIds();

/**
* See which transactions were rolled back
* @return set of transaction IDs
*/
Set<String> getRolledBackTransactionIds();

/**
* action to reset some metrics
*/
void reset();
}

0 comments on commit 0585b2b

Please sign in to comment.