Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Jan 24, 2020
2 parents 276c19b + e68ada6 commit 605d22f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private static int getSwitchCount(Connection connection) {
private static void removeLogFileFromMining(String logFileName, Connection connection) throws SQLException {
String removeLogFileFromMining = SqlUtils.getRemoveLogFileFromMiningStatement(logFileName);
executeCallableStatement(connection, removeLogFileFromMining);
LOGGER.debug("{} was removed from mining", removeLogFileFromMining);
LOGGER.debug("{} was removed from mining session", removeLogFileFromMining);

}

Expand Down Expand Up @@ -284,36 +284,27 @@ static void endMining(Connection connection) {
}

/**
* This method substitutes CONTINUOUS_MINE functionality
* This method substitutes CONTINUOUS_MINE functionality for online files only
* @param connection connection
* @param lastProcessedScn current offset
* @param currentLogFilesForMining list of files we are currently mining
* @throws SQLException if anything unexpected happens
*/
static void setRedoLogFilesForMining(Connection connection, Long lastProcessedScn, List<String> currentLogFilesForMining) throws SQLException {
static void setRedoLogFilesForMining(Connection connection, Long lastProcessedScn) throws SQLException {

Map<String, Long> logFilesForMining = getCurrentlyMinedLogFiles(connection, lastProcessedScn);
Map<String, Long> logFilesForMining = getLogFilesForOffsetScn(connection, lastProcessedScn);
if (logFilesForMining.isEmpty()) {
throw new IllegalStateException("The offset SCN is not in online REDO");
throw new IllegalStateException("The online log files do not contain offset SCN, re-snapshot is required.");
}
List<String> logFilesNamesForMining = logFilesForMining.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());

LOGGER.debug("Last processed SCN: {}, List to mine: {}\n", lastProcessedScn, logFilesForMining);
List<String> outdatedFiles = currentLogFilesForMining.stream().filter(file -> !logFilesNamesForMining.contains(file)).collect(Collectors.toList());
for (String file : outdatedFiles) {
removeLogFileFromMining(file, connection);
LOGGER.debug("deleted outdated file {}", file);
currentLogFilesForMining.remove(file);
}
List<String> logFilesNamesForMining = logFilesForMining.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());

List<String> filesToAddForMining = logFilesNamesForMining.stream().filter(file -> !currentLogFilesForMining.contains(file)).collect(Collectors.toList());
for (String file : filesToAddForMining) {
for (String file : logFilesNamesForMining) {
String addLogFileStatement = SqlUtils.getAddLogFileStatement("DBMS_LOGMNR.ADDFILE", file);
executeCallableStatement(connection, addLogFileStatement);
LOGGER.debug("log file added = {}", file);
currentLogFilesForMining.add(file);
LOGGER.debug("add log file to the mining session = {}", file);
}
LOGGER.debug("Current list to mine: {}\n", currentLogFilesForMining);

LOGGER.debug("Last mined SCN: {}, Log file list to mine: {}\n", lastProcessedScn, logFilesForMining);
}

/**
Expand All @@ -325,21 +316,25 @@ static void setRedoLogFilesForMining(Connection connection, Long lastProcessedSc
* @return Optional last SCN in a redo log
* @throws SQLException if anything unexpected happens
*/
static Optional<Long> getLastScnFromTheOldestMiningRedo(Connection connection, Long lastProcessedScn) throws SQLException {
static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, Long lastProcessedScn) throws SQLException {
Map<String, String> allOnlineRedoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");

Map<String, Long> currentlyMinedLogFiles = getCurrentlyMinedLogFiles(connection, lastProcessedScn);
LOGGER.debug("Redo log size = {}, needed for mining files size = {}", allOnlineRedoLogFiles.size(), currentlyMinedLogFiles.size());
Map<String, Long> logFilesToMine = getLogFilesForOffsetScn(connection, lastProcessedScn);
LOGGER.debug("Redo log size = {}, needed for mining files size = {}", allOnlineRedoLogFiles.size(), logFilesToMine.size());

if (allOnlineRedoLogFiles.size() - currentlyMinedLogFiles.size() <= 1){
List<Long> lastScnInOldestMinedRedo = currentlyMinedLogFiles.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList());
return lastScnInOldestMinedRedo.stream().min(Long::compareTo);
if (allOnlineRedoLogFiles.size() - logFilesToMine.size() <= 1){
List<Long> lastScnInOldestOnlineRedo = logFilesToMine.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList());
return lastScnInOldestOnlineRedo.stream().min(Long::compareTo);
}
return Optional.empty();
}

// 18446744073709551615 on Ora 19c is the max value of the nextScn in the current redo todo replace all Long with BigDecimal for SCN
private static Map<String, Long> getCurrentlyMinedLogFiles(Connection connection, Long offsetScn) throws SQLException {

/**
* This method returns all online log files, starting from one which contains offset SCN and ending with one containing largest SCN
* 18446744073709551615 on Ora 19c is the max value of the nextScn in the current redo todo replace all Long with BigDecimal for SCN
*/
private static Map<String, Long> getLogFilesForOffsetScn(Connection connection, Long offsetScn) throws SQLException {
Map<String, String> redoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");
return redoLogFiles.entrySet().stream().
filter(entry -> new BigDecimal(entry.getValue()).longValue() > offsetScn || new BigDecimal(entry.getValue()).longValue() == -1).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -120,9 +117,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LogMinerHelper.buildDataDictionary(connection);
}

List<String> filesToMine = new ArrayList<>();
if (!isContinuousMining) {
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn, filesToMine);
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
}
LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
String currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
Expand All @@ -143,30 +139,24 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFile, possibleNewCurrentLogFile);

// This is the way to mitigate PGA leak.
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new session
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new mining session
LogMinerHelper.endMining(connection);

if (!isContinuousMining) {
filesToMine.clear();
if (strategy == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO) {
LogMinerHelper.buildDataDictionary(connection);
}
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn, filesToMine);
}

// Abandon long running transactions
Optional<Long> oldestScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestMiningRedo(connection, offsetContext.getScn());
oldestScnToAbandonTransactions.ifPresent(scn -> {
transactionalBuffer.abandonLongTransactions(scn);
offsetContext.setScn(scn);
try {
nextScn = LogMinerHelper.getNextScn(connection, lastProcessedScn, logMinerMetrics);
LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn, filesToMine);
} catch (SQLException e) {
LOGGER.error("Couldn't abandon transactions due to {}", e);
}
LOGGER.debug("offset before: {}, offset after:{}, next SCN:{}", offsetContext.getScn(), scn, nextScn);
});
// Abandon long running transactions
Optional<Long> oldestScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn());
oldestScnToAbandonTransactions.ifPresent(nextOldestScn -> {
transactionalBuffer.abandonLongTransactions(nextOldestScn);
offsetContext.setScn(nextOldestScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOldestScn);
});

LogMinerHelper.setRedoLogFilesForMining(connection, lastProcessedScn);
}

LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);

Expand All @@ -188,7 +178,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
metronome.pause();
}

// update SCN in offset context only if buffer is empty
// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(nextScn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.Chan
* In other words connector will not send any part of this transaction to Kafka
* @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed.
*/
// todo calculate smallest SCN as a new OffsetScn
void abandonLongTransactions(Long thresholdScn){
BigDecimal threshold = new BigDecimal(thresholdScn);
Iterator<Map.Entry<String, Transaction>> iter = transactions.entrySet().iterator();
Expand Down Expand Up @@ -195,6 +196,7 @@ boolean rollback(String transactionId, String debugMessage) {
abandonedTransactionIds.remove(transactionId);
metrics.ifPresent(m -> m.setActiveTransactions(transactions.size()));
metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions);
calculateSmallestScn();
return true;
}
return false;
Expand Down

0 comments on commit 605d22f

Please sign in to comment.