Skip to content

Commit

Permalink
Merge pull request debezium#22 in N4FRA/debezium from ARGO-219310_han…
Browse files Browse the repository at this point in the history
…dle_network_failures to master

Squashed commit of the following:

commit e03d7dc2b364f6184077ea4924b74515e2771599
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 6 16:12:29 2020 -0800

    ARGO-219310, address oldest scn as minus 1

commit 7308947f28d583c72e656975a5a963ae5362f187
Merge: 1829d109 0fe3cf3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 6 16:11:28 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium into ARGO-219310_handle_network_failures

commit 1829d109aca0e2588453e9c6cbc192232383ae27
Merge: 8cbded86 7ab9af3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:51:53 2019 -0800

    ARGO-219310, hadle network failures

commit 7ab9af3
Merge: 90979c1 d8872ce
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:48:18 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8cbded8659b2e7b8878358fd1ece2afe89c8f6d2
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:44:40 2019 -0800

    ARGO-219310, handle network failures

commit 98849be67ee3f0cd780613e04c62b9d973b1b178
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 20 08:44:25 2019 -0800

    ARGO-219310, handle network failures

commit 90979c1
Merge: d174eab 4086b3e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 13 13:39:26 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d174eab
Merge: 0894de1 7e77a83
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Dec 9 08:58:41 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0894de1
Merge: 8353b85 12021bd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Nov 27 15:12:39 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 8353b85
Merge: c6f361f b69f88c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Nov 27 14:57:47 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c6f361f
Merge: 7b499a3 7c9497a
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Nov 26 15:27:32 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7b499a3
Merge: ff19baa 5be9ef5
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Nov 22 14:09:47 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit ff19baa
Merge: 7b0b476 0309d35
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Sep 27 11:04:12 2019 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7b0b476
Merge: 5d76700 eaaddfb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Sep 5 15:52:31 2019 -0700

    ARGO-198431, new parser

commit 5d76700
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Sep 5 15:01:49 2019 -0700

    ARGO-198431, new parser

commit 838f944
Merge: 9f00dfd 61d8afc
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Aug 23 13:16:37 2019 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9f00dfd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Aug 23 13:03:44 2019 -0700

    ARGO-198431, incremental changes Oracle

commit c749ffd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Aug 22 15:45:44 2019 -0700

    initial Oracle connector checkin
  • Loading branch information
Ignatenko Andrey committed Jan 7, 2020
1 parent 0fe3cf3 commit f9de59c
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public int processResult(ResultSet resultSet) {
LOGGER.trace("COMMIT, {}", logMessage);
if (transactionalBuffer.commit(txId, changeTime, context, logMessage)){
commitCounter++;
cumulativeCommitTime = cumulativeCommitTime.plus(Duration.between(iterationStart, Instant.now()));
}
cumulativeCommitTime = cumulativeCommitTime.plus(Duration.between(iterationStart, Instant.now()));
continue;
}

Expand All @@ -115,8 +115,8 @@ public int processResult(ResultSet resultSet) {
LOGGER.trace("ROLLBACK, {}", logMessage);
if (transactionalBuffer.rollback(txId, logMessage)){
rollbackCounter++;
cumulativeRollbackTime = cumulativeRollbackTime.plus(Duration.between(iterationStart, Instant.now()));
}
cumulativeRollbackTime = cumulativeRollbackTime.plus(Duration.between(iterationStart, Instant.now()));
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
throw new RuntimeException(e);
} finally {
LOGGER.info("lastProcessedScn={}, nextScn={}, offsetContext.getScn()={}", lastProcessedScn, nextScn, offsetContext.getScn());
if (transactionalBuffer != null) {
LOGGER.info("Transactional metrics dump: {}", transactionalBufferMetrics.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
}
if (logMinerMetrics != null) {
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
}
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBufferMetrics.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
* @return true if committed transaction is in the buffer
*/
boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.ChangeEventSourceContext context, String debugMessage) {
BigDecimal smallestScn = transactions.isEmpty() ? null : calculateSmallestScn();
Transaction transaction = transactions.remove(transactionId);
if (transaction == null) {
return false;
Expand All @@ -110,7 +111,6 @@ boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.Chan
abandonedTransactionIds.remove(transactionId);

List<CommitCallback> commitCallbacks = transaction.commitCallbacks;
BigDecimal smallestScn = transactions.isEmpty() ? null : calculateSmallestScn();
LOGGER.trace("COMMIT, {}, smallest SCN: {}", debugMessage, smallestScn);
executor.execute(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void setOldestScn(Long scn){
oldestScn.set(scn);
}

// todo deal with timezones
void setLagFromTheSource(Instant changeTime){
if (changeTime != null) {
lagFromTheSource.set(Duration.between(changeTime, Instant.now()));
Expand Down Expand Up @@ -171,7 +172,7 @@ public String toString() {
", committedDmlCounter=" + committedDmlCounter.get() +
", maxLagFromTheSource=" + maxLagFromTheSource.get() +
", minLagFromTheSource=" + minLagFromTheSource.get() +
", totalLagsFromTheSource=" + totalLagsFromTheSource.get() +
", averageLagsFromTheSource=" + getAverageLagFromSource() +
", abandonedTransactionIds=" + abandonedTransactionIds.get() +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testCalculateSmallestScnWhenTransactionIsCommitted() throws Interrup
});
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isNull();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
}

@Test
Expand All @@ -120,7 +120,7 @@ public void testCalculateSmallestScnWhenFirstTransactionIsCommitted() throws Int
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { });
transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(OTHER_SCN);
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
}

@Test
Expand Down

0 comments on commit f9de59c

Please sign in to comment.