Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed May 7, 2020
2 parents 3f0c9de + a043ebd commit f1810b9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
Expand Up @@ -35,14 +35,16 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger maxBatchSize = new AtomicInteger();
private AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
private AtomicInteger fetchedRecordSizeLimitToFallAsleep = new AtomicInteger();
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger(); // todo delete later
private final int MAX_SLEEP_TIME = 3_000;
private final int MIN_SLEEP_TIME = 100;

LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(5_000);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeLimitToFallAsleep.set(50);
fetchedRecordSizeToSleepMore.set(50);

currentScn.set(-1);
capturedDmlCount.set(0);
Expand Down Expand Up @@ -154,8 +156,8 @@ public int getMillisecondToSleepBetweenMiningQuery() {
}

@Override
public int getFetchedRecordSizeLimitToFallAsleep() {
return fetchedRecordSizeLimitToFallAsleep.get();
public int getFetchedRecordSizeToSleepMore() {
return fetchedRecordSizeToSleepMore.get();
}

// MBean accessible setters
Expand All @@ -167,16 +169,32 @@ public void setMaxBatchSize(int size) {
}

@Override
public void setMillisecondToSleepBetweenMiningQuery(int milliseconds) {
if (milliseconds >= 100 && milliseconds <= 5_000){
public void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds) {
if (milliseconds != null && milliseconds >= MIN_SLEEP_TIME && milliseconds < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.set(milliseconds);
}
}

@Override
public void setFetchedRecordSizeLimitToFallAsleep(int size) {
public void incrementSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.getAndAdd(200);
}
}

@Override
public void resetSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.set(100);
}
}

@Override
public void setFetchedRecordSizeToSleepMore(int size) {
if (size >= 50 && size <= 200) {
fetchedRecordSizeLimitToFallAsleep.set(size);
fetchedRecordSizeToSleepMore.set(size);
}
}

Expand Down
Expand Up @@ -97,17 +97,27 @@ public interface LogMinerMetricsMXBean {
* sets number of milliseconds for connector to sleep before fetching another batch from the Log Miner view
* @param milliseconds to sleep
*/
void setMillisecondToSleepBetweenMiningQuery(int milliseconds);
void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds);

/**
* @return number of fetched records from Log Miner view. It serves as a trigger point for connector to sleep.
* This helps in reducing database impact by mining query by making it less frequent
*/
int getFetchedRecordSizeLimitToFallAsleep();
int getFetchedRecordSizeToSleepMore();

/**
* sets the limit of fetched records from Log Miner view.
* @param size number of records
*/
void setFetchedRecordSizeLimitToFallAsleep(int size);
void setFetchedRecordSizeToSleepMore(int size);

/**
* set sleeping time larger
*/
void incrementSleepingTime();

/**
* reset to the minimal value
*/
void resetSleepingTime();
}
Expand Up @@ -228,12 +228,11 @@ int processResult(ResultSet resultSet) {
currentOffsetCommitScn = offsetContext.getCommitScn();
}
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks were processed in {} milliseconds, commit time:{}, rollback time: {}, parse time:{}, " +
"other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}",
"other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}, sleep time:{}",
dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()),
cumulativeCommitTime.toMillis(), cumulativeRollbackTime.toMillis(),
cumulativeParseTime.toMillis(), cumulativeOtherTime.toMillis(),
transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(),
offsetContext.getCommitScn(), transactionalBufferMetrics.getNumberOfActiveTransactions());
cumulativeCommitTime.toMillis(), cumulativeRollbackTime.toMillis(), cumulativeParseTime.toMillis(), cumulativeOtherTime.toMillis(),
transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(), offsetContext.getCommitScn(),
transactionalBufferMetrics.getNumberOfActiveTransactions(), metrics.getMillisecondToSleepBetweenMiningQuery());
}
return dmlCounter;
}
Expand Down
Expand Up @@ -137,7 +137,14 @@ public void execute(ChangeEventSourceContext context) {
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);

endScn = LogMinerHelper.getNextScn(connection, startScn, logMinerMetrics);
// this is to reduce the DB impact
// adjust sleeping time to optimize DB impact and catchup faster when lag is large
if (transactionalBufferMetrics.getLagFromSource() < 2000) {
logMinerMetrics.incrementSleepingTime();
}
if (transactionalBufferMetrics.getLagFromSource() > 10_000) {
logMinerMetrics.resetSleepingTime();
}
LOGGER.trace("sleeping for {} milliseconds", logMinerMetrics.getMillisecondToSleepBetweenMiningQuery());
metronome.pause();

LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
Expand Down Expand Up @@ -174,11 +181,6 @@ public void execute(ChangeEventSourceContext context) {
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
int processedCount = processor.processResult(res);

if (processedCount < logMinerMetrics.getFetchedRecordSizeLimitToFallAsleep()) {
LOGGER.trace("sleeping for {} milliseconds", logMinerMetrics.getMillisecondToSleepBetweenMiningQuery());
metronome.pause();
}

updateStartScn();

// get largest scn from the last uncommitted transaction and set as last processed scn
Expand Down

0 comments on commit f1810b9

Please sign in to comment.