Skip to content

Commit

Permalink
Merge pull request debezium#53 in N4FRA/debezium from DSCON-364_JSQLP…
Browse files Browse the repository at this point in the history
…arserException_with_new_tables to master

Squashed commit of the following:

commit dd4b64677940330a48f32dc5e2c6a76dcbb18cc2
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 22 14:26:47 2020 -0700

    DSCON-364, JSQLParserException with new tables

commit aaa85742ecd60ace8e6baaa85aafb84fb3b7ff37
Merge: df3d36ac 796bf4b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:19:28 2020 -0700

    Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC

commit 796bf4b
Merge: 8b36431 2c1fc9e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:19:16 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit df3d36ac7d0d8a7927d90b47fd6c92555644f285
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:16:18 2020 -0700

    ARGO-209312, db connector  on Oracle RAC

commit aac6d3a4110a4ec08b39f68a39ec1e37eb66e7eb
Merge: 44bc0414 8b36431
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:15:50 2020 -0700

    Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC

commit 8b36431
Merge: 66c207f 70ad303
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed May 13 16:37:01 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 44bc04142f38740940ef247540e1b370d1754e11
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed May 13 16:34:20 2020 -0700

    ARGO-209312, db connector  on Oracle RAC

commit 34e10b92b354dbc3804f4c9ed90eb27a9d8f5fa4
Merge: c953356a 66c207f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:18:25 2020 -0700

    Merge branch 'master' into DSCON-301_DBC_Crashed_after_hours_downtime

commit 66c207f
Merge: f1810b9 d5de8d8
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:17:49 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c953356a2b5aed88846be8be41f97249f0f96570
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 09:51:42 2020 -0700

    DSCON-301, DBC Crashed after 2 hours of downtime

commit 3ec1c0bc00d9f7e9737292660cf7f4989b26888a
Merge: bccb2c81 f1810b9
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 15:00:23 2020 -0700

    Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery

commit f1810b9
Merge: 3f0c9de a043ebd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:59:46 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit bccb2c81683362ece4dcc8bd162dba3144ced410
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:56:49 2020 -0700

    DSCON-268, manage millisecondToSleepBetweenMiningQuery  value

commit ac591b18fd5b1f5def9126e1ac25ee4c6c5649df
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:54:30 2020 -0700

    DSCON-268, manage millisecondToSleepBetweenMiningQuery  value

commit 6abfd55c1a6e740dd22c0977c42d1baa834dc47f
Merge: 38ac1d4d 3f0c9de
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:52:44 2020 -0700

    Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery

commit 3f0c9de
Merge: 2de0428 5481902
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:52:06 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 38ac1d4d0ae19bcfcc1e110d509e11606f5ff668
Merge: c4d2db35 2de0428
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:46:18 2020 -0700

    Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors

commit 2de0428
Merge: b514c5e 274e969
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:46:06 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c4d2db3592eb492890039726eb1ad5889b5dfa1b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:43:43 2020 -0700

    DSCON-262, Analyse and address parsing errors

commit c8db7e91013df5e2baf22ce981cc0ff9577eb4cf
Merge: d29c1e89 b514c5e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:42:37 2020 -0700

    Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors

... and 46 more commits
  • Loading branch information
Ignatenko Andrey committed May 22, 2020
1 parent 2c1fc9e commit 7c49efd
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 83 deletions.
Expand Up @@ -96,6 +96,7 @@ public LogMinerRowLcr parse(String dmlContent, Tables tables, String txId){
}
// this is to handle cases when a record contains escape character(s). This parser throws.
dmlContent = dmlContent.replaceAll("\\\\", "\\\\\\\\");
dmlContent = dmlContent.replaceAll("= Unsupported Type", "= null"); // todo address spatial data types

newColumnValues.clear();
oldColumnValues.clear();
Expand Down
Expand Up @@ -91,20 +91,24 @@ public static void createAuditTable(Connection connection) throws SQLException {
*
* @param connection container level database connection
* @param metrics MBean accessible metrics
* @param lastProcessesScn offset SCN
* @param startScn start SCN
* @return next SCN to mine to
* @throws SQLException if anything unexpected happens
*/
static long getNextScn(Connection connection, long lastProcessesScn, LogMinerMetrics metrics) throws SQLException {
static long getEndScn(Connection connection, long startScn, LogMinerMetrics metrics) throws SQLException {
long currentScn = getCurrentScn(connection);
metrics.setCurrentScn(currentScn);
int miningDiapason = metrics.getMaxBatchSize();
int miningDiapason = metrics.getBatchSize();

// it is critical to commit to flush LogWriter buffer
// it is critical to flush LogWriter buffer
executeCallableStatement(connection, SqlUtils.UPDATE_AUDIT_TABLE + currentScn);
executeCallableStatement(connection, "commit");

return currentScn < (lastProcessesScn + miningDiapason) ? currentScn : lastProcessesScn + miningDiapason;
// adjust sleeping time to optimize DB impact and catchup faster when behind
boolean isNextScnCloseToDbCurrent = currentScn < (startScn + miningDiapason);
metrics.changeSleepingTime(isNextScnCloseToDbCurrent);

return isNextScnCloseToDbCurrent ? currentScn : startScn + miningDiapason;
}

/**
Expand Down Expand Up @@ -333,22 +337,21 @@ static void setRedoLogFilesForMining(Connection connection, Long lastProcessedSc

/**
* This method returns SCN as a watermark to abandon long lasting transactions.
* This is a way to mitigate long lasting transactions when it could fall out of online redo logs range
*
* @param connection connection
* @param lastProcessedScn current offset
* @param offsetScn current offset
* @return Optional last SCN in a redo log
* @throws SQLException if anything unexpected happens
*/
static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, Long lastProcessedScn) throws SQLException {
static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, Long offsetScn) throws SQLException {

Map<String, String> allOnlineRedoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");
Map<String, Long> logFilesToMine = getLogFilesForOffsetScn(connection, offsetScn);

Map<String, Long> logFilesToMine = getLogFilesForOffsetScn(connection, lastProcessedScn);
LOGGER.debug("Redo log size = {}, needed for mining files size = {}", allOnlineRedoLogFiles.size(), logFilesToMine.size());

if (allOnlineRedoLogFiles.size() - logFilesToMine.size() <= 1){
List<Long> lastScnInOldestOnlineRedo = logFilesToMine.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList());
return lastScnInOldestOnlineRedo.stream().min(Long::compareTo);
List<Long> lastScnsInRedoLogToMine = logFilesToMine.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList());
return lastScnsInRedoLogToMine.stream().min(Long::compareTo);
}
return Optional.empty();
}
Expand Down
Expand Up @@ -34,27 +34,24 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicReference<Duration> lastProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger processedCapturedBatchCount = new AtomicInteger();
private AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger maxBatchSize = new AtomicInteger();
private AtomicInteger batchSize = new AtomicInteger();
private AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger();

private final int MAX_SLEEP_TIME = 3_000;
private final int DEFAULT_SLEEP_TIME = 1_000;
private final int MIN_SLEEP_TIME = 100;

private final int MIN_BATCH_SIZE = 100;
private final int MIN_BATCH_SIZE = 1_000;
private final int MAX_BATCH_SIZE = 100_000;
private final int DEFAULT_BATCH_SIZE = 10_000;
private final int DEFAULT_BATCH_SIZE = 5_000;

private final int SLEEP_TIME_INCREMENT = 200;
private final int SIZE_TO_SLEEP_LONGER = 50;

LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(DEFAULT_BATCH_SIZE);
batchSize.set(DEFAULT_BATCH_SIZE);
millisecondToSleepBetweenMiningQuery.set(DEFAULT_SLEEP_TIME);
fetchedRecordSizeToSleepMore.set(SIZE_TO_SLEEP_LONGER);

currentScn.set(-1);
capturedDmlCount.set(0);
Expand Down Expand Up @@ -156,25 +153,20 @@ public Long getAverageProcessedCapturedBatchDuration() {
}

@Override
public int getMaxBatchSize() {
return maxBatchSize.get();
public int getBatchSize() {
return batchSize.get();
}

@Override
public Integer getMillisecondToSleepBetweenMiningQuery() {
return millisecondToSleepBetweenMiningQuery.get();
}

@Override
public int getFetchedRecordSizeToSleepMore() {
return fetchedRecordSizeToSleepMore.get();
}

// MBean accessible setters
@Override
public void setMaxBatchSize(int size) {
public void setBatchSize(int size) {
if (size >= MIN_BATCH_SIZE && size <= MAX_BATCH_SIZE) {
maxBatchSize.set(size);
batchSize.set(size);
}
}

Expand All @@ -186,18 +178,11 @@ public void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds) {
}

@Override
public void incrementSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.getAndAdd(SLEEP_TIME_INCREMENT);
}
}

@Override
public void resetSleepingTime() {
public void changeSleepingTime(boolean increment) {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
int change = increment ? SLEEP_TIME_INCREMENT : -SLEEP_TIME_INCREMENT;
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.set(MIN_SLEEP_TIME);
millisecondToSleepBetweenMiningQuery.getAndAdd(change);
}
}

Expand Down Expand Up @@ -226,9 +211,8 @@ public String toString() {
", lastProcessedCapturedBatchDuration=" + lastProcessedCapturedBatchDuration.get() +
", processedCapturedBatchCount=" + processedCapturedBatchCount.get() +
", averageProcessedCapturedBatchDuration=" + averageProcessedCapturedBatchDuration.get() +
", maxBatchSize=" + maxBatchSize.get() +
", millisecondToSleepBetweenMiningQuery=" + millisecondToSleepBetweenMiningQuery.get() +
", maxBatchSize=" + maxBatchSize.get() +
", batchSize=" + batchSize.get() +
'}';
}
}
Expand Up @@ -76,17 +76,17 @@ public interface LogMinerMetricsMXBean {

/**
* Maximum number of entries in Log Miner view to fetch. This is used to set the diapason of the SCN in mining query.
* If difference between "start SCN" and "end SCN" to mine exceeds this limit, end SCN will be set to "start SCN" + maxBatchSize
* If difference between "start SCN" and "end SCN" to mine exceeds this limit, end SCN will be set to "start SCN" + batchSize
* @return the limit
*/
int getMaxBatchSize();
int getBatchSize();

/**
* this gives ability to manipulate maximum number of entries in Log Miner view to fetch.
* It has limits to prevent abnormal values
* @param size limit
*/
void setMaxBatchSize(int size);
void setBatchSize(int size);

/**
* @return number of milliseconds for connector to sleep before fetching another batch from the Log Miner view
Expand All @@ -100,18 +100,8 @@ public interface LogMinerMetricsMXBean {
void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds);

/**
* @return number of fetched records from Log Miner view. It serves as a trigger point for connector to sleep.
* This helps in reducing database impact by mining query by making it less frequent
* change sleeping time
* @param increment true to add, false to deduct
*/
int getFetchedRecordSizeToSleepMore();

/**
* set sleeping time larger
*/
void incrementSleepingTime();

/**
* reset to the minimal value
*/
void resetSleepingTime();
void changeSleepingTime(boolean increment);
}
Expand Up @@ -224,7 +224,7 @@ int processResult(ResultSet resultSet) {
if (offsetContext.getCommitScn() != null) {
currentOffsetCommitScn = offsetContext.getCommitScn();
}
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks. Timing in msec (total:{}, commit:{}, parse:{}, other:{}). " +
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks. Millis - (total:{}, commit:{}, parse:{}, other:{}). " +
"Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}",
dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()),
cumulativeCommitTime.toMillis(), cumulativeParseTime.toMillis(), cumulativeOtherTime.toMillis(),
Expand Down
Expand Up @@ -96,7 +96,6 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context) {
Metronome metronome;
int processedCount = logMinerMetrics.getFetchedRecordSizeToSleepMore();

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
Expand Down Expand Up @@ -137,15 +136,8 @@ public void execute(ChangeEventSourceContext context) {
while (context.isRunning()) {
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);

endScn = LogMinerHelper.getNextScn(connection, startScn, logMinerMetrics);
endScn = LogMinerHelper.getEndScn(connection, startScn, logMinerMetrics);

// adjust sleeping time to optimize DB impact and catchup faster when lag is large
if (transactionalBufferMetrics.getLagFromSource() < 2000 && processedCount < logMinerMetrics.getFetchedRecordSizeToSleepMore()) {
logMinerMetrics.incrementSleepingTime();
}
if (transactionalBufferMetrics.getLagFromSource() > 10_000) {
logMinerMetrics.resetSleepingTime();
}
LOGGER.trace("sleeping for {} milliseconds", logMinerMetrics.getMillisecondToSleepBetweenMiningQuery());
metronome.pause();

Expand Down Expand Up @@ -175,13 +167,13 @@ public void execute(ChangeEventSourceContext context) {
LogMinerHelper.startOnlineMining(connection, startScn, endScn, strategy, isContinuousMining);

Instant startTime = Instant.now();
fetchFromMiningView.setFetchSize(10000); // todo parametrize
fetchFromMiningView.setFetchSize(10_000);
fetchFromMiningView.setLong(1, startScn);
fetchFromMiningView.setLong(2, endScn);

ResultSet res = fetchFromMiningView.executeQuery();
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
processedCount = processor.processResult(res);
processor.processResult(res);

updateStartScn();

Expand Down Expand Up @@ -222,10 +214,10 @@ public void execute(ChangeEventSourceContext context) {

private void abandonOldTransactionsIfExist(Connection connection) throws SQLException {
Optional<Long> lastScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn());
lastScnToAbandonTransactions.ifPresent(nextOffsetScn -> {
transactionalBuffer.abandonLongTransactions(nextOffsetScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOffsetScn);
offsetContext.setScn(nextOffsetScn);
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
LOGGER.debug("All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
transactionalBuffer.abandonLongTransactions(thresholdScn);
offsetContext.setScn(thresholdScn);
updateStartScn();
});
}
Expand Down
Expand Up @@ -148,14 +148,14 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change

/**
* @param transactionId transaction identifier
* @param commitScn SCN of the commit.
* @param scn SCN of the commit.
* @param offsetContext Oracle offset
* @param timestamp commit timestamp
* @param context context to check that source is running
* @param debugMessage message
* @return true if committed transaction is in the buffer and was not processed already
* @return true if committed transaction is in the buffer, was not processed yet and processed now
*/
boolean commit(String transactionId, BigDecimal commitScn, OracleOffsetContext offsetContext, Timestamp timestamp,
boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetContext, Timestamp timestamp,
ChangeEventSource.ChangeEventSourceContext context, String debugMessage) {

Transaction transaction = transactions.get(transactionId);
Expand All @@ -170,9 +170,11 @@ boolean commit(String transactionId, BigDecimal commitScn, OracleOffsetContext o
taskCounter.incrementAndGet();
abandonedTransactionIds.remove(transactionId);

if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn() >= commitScn.longValue()) || lastCommittedScn.longValue() >= commitScn.longValue()) {
LOGGER.info("Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}",
transactionId, offsetContext.getCommitScn(), commitScn);
// On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed.
// Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications
if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn() > scn.longValue()) || lastCommittedScn.longValue() > scn.longValue()) {
LOGGER.warn("Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}",
transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn);
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
return false;
}
Expand All @@ -186,14 +188,14 @@ boolean commit(String transactionId, BigDecimal commitScn, OracleOffsetContext o
if (!context.isRunning()) {
return;
}
callback.execute(timestamp, smallestScn, commitScn, --counter);
callback.execute(timestamp, smallestScn, scn, --counter);
}

lastCommittedScn = new BigDecimal(commitScn.longValue());
lastCommittedScn = new BigDecimal(scn.longValue());
metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions);
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size()));
metrics.ifPresent(m -> m.setCommittedScn(commitScn.longValue()));
metrics.ifPresent(m -> m.setCommittedScn(scn.longValue()));
} catch (InterruptedException e) {
LOGGER.error("Thread interrupted during running", e);
Thread.currentThread().interrupt();
Expand All @@ -218,7 +220,7 @@ boolean rollback(String transactionId, String debugMessage) {

Transaction transaction = transactions.get(transactionId);
if (transaction != null) {
LOGGER.debug("Transaction {} rolled back, {}", transactionId, debugMessage);
LOGGER.debug("Transaction rolled back, {}", debugMessage);

calculateLargestScn(); // in case if largest SCN was in this transaction
transactions.remove(transactionId);
Expand Down
Expand Up @@ -200,14 +200,15 @@ public void shouldParseUpdateNoChangesTable() throws Exception {

String dml = "update \"" + FULL_TABLE_NAME + "\" set \"col1\" = '6', col2 = 'text', col3 = 'text', col4 = NULL " +
"where ID = 5 and COL1 = 6 and \"COL2\" = 'text' " +
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL and COL7 IS NULL and COL9 IS NULL and COL10 IS NULL and COL12 IS NULL " +
"and COL3 = Unsupported Type and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL and COL7 IS NULL and COL9 IS NULL and COL10 IS NULL and COL12 IS NULL " +
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";

LogMinerRowLcr record = sqlDmlParser.parse(dml, tables, "");
boolean pass = record.getCommandType().equals(Envelope.Operation.UPDATE)
&& record.getOldValues().size() == record.getNewValues().size()
&& record.getNewValues().containsAll(record.getOldValues());
assertThat(pass);
assertThat(record.getOldValues().get(4).getColumnData()).isNull();
}

@Test
Expand Down

0 comments on commit 7c49efd

Please sign in to comment.