Skip to content

Commit

Permalink
Merge branch 'master' into DBZ-137_candidate_merge_branch
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Jun 27, 2020
2 parents 9dc2116 + 52dfe6a commit c76688c
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 48 deletions.
Expand Up @@ -89,7 +89,8 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.dmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converters);
this.transactionalBufferMetrics = new TransactionalBufferMetrics(taskContext);
this.transactionalBufferMetrics.register(LOGGER);
transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler, transactionalBufferMetrics);
transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler,
transactionalBufferMetrics, connectorConfig.getMaxQueueSize());
this.logMinerMetrics = new LogMinerMetrics(taskContext);
this.logMinerMetrics.register(LOGGER);
this.strategy = connectorConfig.getLogMiningStrategy();
Expand All @@ -106,7 +107,7 @@ public void execute(ChangeEventSourceContext context) {
Metronome metronome;

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
while (context.isRunning()) {
try (Connection connection = jdbcConnection.connection(false);
PreparedStatement fetchFromMiningView =
connection.prepareStatement(SqlUtils.queryLogMinerContents(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
Expand Down Expand Up @@ -220,7 +221,7 @@ private void abandonOldTransactionsIfExist(Connection connection) throws SQLExce
}

private void updateStartScn() {
long nextStartScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
long nextStartScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
if (nextStartScn <= startScn) {
// When system is idle, largest SCN may stay unchanged, move it forward then
transactionalBuffer.resetLargestScn(endScn);
Expand All @@ -233,11 +234,11 @@ public void commitOffset(Map<String, ?> offset) {
// nothing to do
}

private boolean connectionProblem (Throwable e){
private boolean connectionProblem(Throwable e) {
if (e.getMessage() == null || e.getCause() == null) {
return false;
}
return e.getMessage().startsWith("ORA-03135") || // connection lost contact
return e.getMessage().startsWith("ORA-03135") || // connection lost contact
e.getMessage().startsWith("ORA-12543") || // TNS:destination host unreachable
e.getMessage().startsWith("ORA-00604") || // error occurred at recursive SQL level 1
e.getMessage().startsWith("ORA-01089") || // Oracle immediate shutdown in progress
Expand Down
Expand Up @@ -27,9 +27,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* @author Andrey Pustovetov
Expand All @@ -46,6 +50,7 @@ public final class TransactionalBuffer {
private final ExecutorService executor;
private final AtomicInteger taskCounter;
private final ErrorHandler errorHandler;
private final Supplier<Integer> commitQueueCapacity;
private TransactionalBufferMetrics metrics;
private final Set<String> abandonedTransactionIds;

Expand All @@ -60,13 +65,20 @@ public final class TransactionalBuffer {
/**
* Constructor to create a new instance.
*
* @param logicalName logical name
* @param errorHandler logError handler
* @param metrics metrics MBean
* @param logicalName logical name
* @param errorHandler logError handler
* @param metrics metrics MBean
* @param inCommitQueueCapacity commit queue capacity. On overflow, caller runs task
*/
TransactionalBuffer(String logicalName, ErrorHandler errorHandler, TransactionalBufferMetrics metrics) {
TransactionalBuffer(String logicalName, ErrorHandler errorHandler, TransactionalBufferMetrics metrics, int inCommitQueueCapacity) {
this.transactions = new HashMap<>();
this.executor = Threads.newSingleThreadExecutor(OracleConnector.class, logicalName, "transactional-buffer");
final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(inCommitQueueCapacity);
executor = new ThreadPoolExecutor(1, 1,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
workQueue,
Threads.threadFactory(OracleConnector.class, logicalName, "transactional-buffer", true),
new ThreadPoolExecutor.CallerRunsPolicy());
commitQueueCapacity = workQueue::remainingCapacity;
this.taskCounter = new AtomicInteger();
this.errorHandler = errorHandler;
this.metrics = metrics;
Expand Down Expand Up @@ -142,8 +154,11 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
}

/**
* If the commit executor queue is full, back-pressure will be applied by letting execution of the callback
* be performed by the calling thread.
*
* @param transactionId transaction identifier
* @param scn SCN of the commit.
* @param scn SCN of the commit.
* @param offsetContext Oracle offset
* @param timestamp commit timestamp
* @param context context to check that source is running
Expand Down Expand Up @@ -187,19 +202,21 @@ boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetC
}

lastCommittedScn = new BigDecimal(scn.longValue());
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactions.size());
metrics.incrementCommittedDmlCounter(commitCallbacks.size());
metrics.setCommittedScn(scn.longValue());
} catch (InterruptedException e) {
LogMinerHelper.logError(metrics, "Thread interrupted during running", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
errorHandler.setProducerThrowable(e);
} finally {
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactions.size());
metrics.incrementCommittedDmlCounter(commitCallbacks.size());
metrics.setCommittedScn(scn.longValue());
metrics.setCommitQueueCapacity(commitQueueCapacity.get());
taskCounter.decrementAndGet();
}
});
metrics.setCommitQueueCapacity(commitQueueCapacity.get());

return true;
}
Expand Down Expand Up @@ -325,9 +342,9 @@ public interface CommitCallback {
/**
* Executes callback.
*
* @param timestamp commit timestamp
* @param smallestScn smallest SCN among other transactions
* @param commitScn commit SCN
* @param timestamp commit timestamp
* @param smallestScn smallest SCN among other transactions
* @param commitScn commit SCN
* @param callbackNumber number of the callback in the transaction
*/
void execute(Timestamp timestamp, BigDecimal smallestScn, BigDecimal commitScn, int callbackNumber) throws InterruptedException;
Expand Down
Expand Up @@ -30,6 +30,7 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
private AtomicLong committedTransactions = new AtomicLong();
private AtomicLong capturedDmlCounter = new AtomicLong();
private AtomicLong committedDmlCounter = new AtomicLong();
private AtomicInteger commitQueueCapacity = new AtomicInteger();
private AtomicReference<Duration> maxLagFromTheSource = new AtomicReference<>();
private AtomicReference<Duration> minLagFromTheSource = new AtomicReference<>();
private AtomicReference<Duration> averageLagsFromTheSource = new AtomicReference<>();
Expand Down Expand Up @@ -230,6 +231,15 @@ public int getScnFreezeCounter() {
return scnFreezeCounter.get();
}

@Override
public int getCommitQueueCapacity() {
return commitQueueCapacity.get();
}

void setCommitQueueCapacity(int commitQueueCapacity) {
this.commitQueueCapacity.set(commitQueueCapacity);
}

@Override
public void reset() {
maxLagFromTheSource.set(Duration.ZERO);
Expand All @@ -246,6 +256,7 @@ public void reset() {
errorCounter.set(0);
warningCounter.set(0);
scnFreezeCounter.set(0);
commitQueueCapacity.set(0);
}

@Override
Expand All @@ -266,6 +277,7 @@ public String toString() {
", errorCounter=" + errorCounter.get() +
", warningCounter=" + warningCounter.get() +
", scnFreezeCounter=" + scnFreezeCounter.get() +
", commitQueueCapacity=" + commitQueueCapacity.get() +
'}';
}
}
Expand Up @@ -42,6 +42,7 @@ public interface TransactionalBufferMetricsMXBean {

/**
* exposes total number of captured DMLs
*
* @return captured DML count
*/
long getCapturedDmlCount();
Expand All @@ -62,64 +63,84 @@ public interface TransactionalBufferMetricsMXBean {

/**
* It shows last committed SCN
*
* @return committed SCN
*/
Long getCommittedScn();

/**
* This is to get the lag between latest captured change timestamp in REDO LOG and time of it's placement in the buffer
*
* @return lag in milliseconds
*/
long getLagFromSource();

/**
* This is to get max value of the time difference between logging of source DB records into redo log and capturing it by Log Miner
*
* @return value in milliseconds
*/
long getMaxLagFromSource();

/**
* This is to get min value of the time difference between logging of source DB records into redo log and capturing it by Log Miner
*
* @return value in milliseconds
*/
long getMinLagFromSource();

/**
* This is to get average value of the time difference between logging of source DB records into redo log and capturing it by Log Miner.
* Average is calculated as summary of all lags / number of captured DB changes
*
* @return value in milliseconds
*/
long getAverageLagFromSource();

/**
* This is to get list of removed transactions from the Transactional Buffer
* @return count
*
* @return count abandoned transaction ids
*/
Set<String> getAbandonedTransactionIds();

/**
* See which transactions were rolled back
*
* @return set of transaction IDs
*/
Set<String> getRolledBackTransactionIds();

/**
* Gets commit queue capacity. As the queue fills up, this reduces to zero
*
* @return the commit queue capacity
*/
int getCommitQueueCapacity();

/**
* action to reset some metrics
*/
void reset();

/**
* This is to get logged logError counter.
*
* @return the error counter
*/
int getErrorCounter();

/**
* This is to get logged warning counter
*
* @return the warning counter
*/
int getWarningCounter();

/**
* Get counter of encountered observations when SCN does not change in the offset.
*
* @return the scn freeze counter
*/
int getScnFreezeCounter();
}

0 comments on commit c76688c

Please sign in to comment.