Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed May 13, 2020
2 parents 66c207f + 70ad303 commit 8b36431
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 54 deletions.
Expand Up @@ -20,11 +20,13 @@
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -69,7 +71,7 @@ public static long getCurrentScn(Connection connection) throws SQLException {
}
}

public static void createAuditTable(Connection connection) throws SQLException{
public static void createAuditTable(Connection connection) throws SQLException {
String tableExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_EXISTS);
if (tableExists == null) {
executeCallableStatement(connection, SqlUtils.CREATE_AUDIT_TABLE);
Expand Down Expand Up @@ -161,27 +163,27 @@ static void startOnlineMining(Connection connection, Long startScn, Long endScn,
}

/**
* This method query the database to get CURRENT online redo log file
* This method query the database to get CURRENT online redo log file(s). Multiple is applicable for RAC systems.
* @param connection connection to reuse
* @param metrics MBean accessible metrics
* @return full redo log file name, including path
* @return full redo log file name(s), including path
* @throws SQLException if anything unexpected happens
*/
static String getCurrentRedoLogFile(Connection connection, LogMinerMetrics metrics) throws SQLException {
static Set<String> getCurrentRedoLogFiles(Connection connection, LogMinerMetrics metrics) throws SQLException {
String checkQuery = SqlUtils.CURRENT_REDO_LOG_NAME;

String fileName = "";
Set<String> fileNames = new HashSet<>();
PreparedStatement st = connection.prepareStatement(checkQuery);
ResultSet result = st.executeQuery();
while (result.next()) {
fileName = result.getString(1);
LOGGER.trace(" Current Redo log fileName: {} ", fileName);
fileNames.add(result.getString(1));
LOGGER.trace(" Current Redo log fileName: {} ", fileNames);
}
st.close();
result.close();

metrics.setCurrentLogFileName(fileName);
return fileName;
metrics.setCurrentLogFileName(fileNames);
return fileNames;
}

/**
Expand Down Expand Up @@ -270,7 +272,7 @@ static void checkSupplementalLogging(OracleConnection jdbcConnection, Connection
String validateGlobalLogging = "SELECT '" + key + "', " + " SUPPLEMENTAL_LOG_DATA_ALL from V$DATABASE";
Map<String, String> globalLogging = getMap(connection, validateGlobalLogging, UNKNOWN);
if ("no".equalsIgnoreCase(globalLogging.get(key))) {
throw new RuntimeException("Supplemental logging was not set");
throw new RuntimeException("Supplemental logging was not set. Use command: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
} finally {
if (pdbName != null) {
Expand Down Expand Up @@ -356,9 +358,8 @@ static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, L
* @param connection connection
* @return size
*/
static int getRedoLogGroupSize(Connection connection) throws SQLException {
Map<String, String> allOnlineRedoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");
return allOnlineRedoLogFiles.size();
private static int getRedoLogGroupSize(Connection connection) throws SQLException {
return getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1").size();
}

/**
Expand Down
Expand Up @@ -12,6 +12,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -24,7 +25,7 @@
public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicLong currentScn = new AtomicLong();
private AtomicInteger capturedDmlCount = new AtomicInteger();
private AtomicReference<String> currentLogFileName;
private AtomicReference<String[]> currentLogFileName;
private AtomicReference<String[]> redoLogStatus;
private AtomicInteger switchCounter = new AtomicInteger();
private AtomicReference<Duration> lastLogMinerQueryDuration = new AtomicReference<>();
Expand All @@ -35,17 +36,25 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger maxBatchSize = new AtomicInteger();
private AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger(); // todo delete later
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger();

private final int MAX_SLEEP_TIME = 3_000;
private final int DEFAULT_SLEEP_TIME = 1_000;
private final int MIN_SLEEP_TIME = 100;
private final int BATCH_SIZE = 10_000;

private final int MIN_BATCH_SIZE = 100;
private final int MAX_BATCH_SIZE = 100_000;
private final int DEFAULT_BATCH_SIZE = 10_000;

private final int SLEEP_TIME_INCREMENT = 200;
private final int SIZE_TO_SLEEP_LONGER = 50;

LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(BATCH_SIZE);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeToSleepMore.set(50);
maxBatchSize.set(DEFAULT_BATCH_SIZE);
millisecondToSleepBetweenMiningQuery.set(DEFAULT_SLEEP_TIME);
fetchedRecordSizeToSleepMore.set(SIZE_TO_SLEEP_LONGER);

currentScn.set(-1);
capturedDmlCount.set(0);
Expand All @@ -69,8 +78,8 @@ public void setCapturedDmlCount(int count){
capturedDmlCount.set(count);
}

public void setCurrentLogFileName(String name){
currentLogFileName.set(name);
public void setCurrentLogFileName(Set<String> names){
currentLogFileName.set(names.stream().toArray(String[]::new));
}

public void setRedoLogStatus(Map<String, String> status){
Expand Down Expand Up @@ -102,7 +111,7 @@ public int getCapturedDmlCount() {
}

@Override
public String getCurrentRedoLogFileName() {
public String[] getCurrentRedoLogFileName() {
return currentLogFileName.get();
}

Expand Down Expand Up @@ -164,7 +173,7 @@ public int getFetchedRecordSizeToSleepMore() {
// MBean accessible setters
@Override
public void setMaxBatchSize(int size) {
if (size >= 100 && size <= 20_000) {
if (size >= MIN_BATCH_SIZE && size <= MAX_BATCH_SIZE) {
maxBatchSize.set(size);
}
}
Expand All @@ -180,22 +189,15 @@ public void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds) {
public void incrementSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.getAndAdd(200);
millisecondToSleepBetweenMiningQuery.getAndAdd(SLEEP_TIME_INCREMENT);
}
}

@Override
public void resetSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.set(100);
}
}

@Override
public void setFetchedRecordSizeToSleepMore(int size) {
if (size >= 50 && size <= 200) {
fetchedRecordSizeToSleepMore.set(size);
millisecondToSleepBetweenMiningQuery.set(MIN_SLEEP_TIME);
}
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ public interface LogMinerMetricsMXBean {
*
* @return full path or NULL if an exception occurs.
*/
String getCurrentRedoLogFileName();
String[] getCurrentRedoLogFileName();

/**
* Exposes states of redo logs: current, active, inactive, unused ...
Expand Down Expand Up @@ -105,12 +105,6 @@ public interface LogMinerMetricsMXBean {
*/
int getFetchedRecordSizeToSleepMore();

/**
* sets the limit of fetched records from Log Miner view.
* @param size number of records
*/
void setFetchedRecordSizeToSleepMore(int size);

/**
* set sleeping time larger
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -95,6 +96,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context) {
Metronome metronome;
int processedCount = logMinerMetrics.getFetchedRecordSizeToSleepMore();

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
Expand Down Expand Up @@ -126,7 +128,7 @@ public void execute(ChangeEventSourceContext context) {
LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
}
LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
String currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
Set<String> currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);

LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, transactionalBuffer,
dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics, catalogName, clock);
Expand All @@ -136,8 +138,9 @@ public void execute(ChangeEventSourceContext context) {
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);

endScn = LogMinerHelper.getNextScn(connection, startScn, logMinerMetrics);

// adjust sleeping time to optimize DB impact and catchup faster when lag is large
if (transactionalBufferMetrics.getLagFromSource() < 2000) {
if (transactionalBufferMetrics.getLagFromSource() < 2000 && processedCount < logMinerMetrics.getFetchedRecordSizeToSleepMore()) {
logMinerMetrics.incrementSleepingTime();
}
if (transactionalBufferMetrics.getLagFromSource() > 10_000) {
Expand All @@ -147,10 +150,10 @@ public void execute(ChangeEventSourceContext context) {
metronome.pause();

LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
String possibleNewCurrentLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
Set<String> possibleNewCurrentLogFile = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);

if (!currentRedoLogFile.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFile, possibleNewCurrentLogFile);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFiles, possibleNewCurrentLogFile);

// This is the way to mitigate PGA leak.
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new mining session
Expand All @@ -166,7 +169,7 @@ public void execute(ChangeEventSourceContext context) {
}

LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);
}

LogMinerHelper.startOnlineMining(connection, startScn, endScn, strategy, isContinuousMining);
Expand All @@ -178,7 +181,7 @@ public void execute(ChangeEventSourceContext context) {

ResultSet res = fetchFromMiningView.executeQuery();
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
int processedCount = processor.processResult(res);
processedCount = processor.processResult(res);

updateStartScn();

Expand All @@ -198,7 +201,7 @@ public void execute(ChangeEventSourceContext context) {
// LogMinerHelper.endMining(connection);
// LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
// LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
// currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
// currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);
// }
}
} catch (Throwable e) {
Expand Down
Expand Up @@ -29,15 +29,14 @@ class SqlUtils {
" + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3'))) from dual";
static final String END_LOGMNR = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;";
static final String OLDEST_FIRST_CHANGE = "SELECT MIN(FIRST_CHANGE#) FROM V$LOG";
static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE " +
" FROM V$LOG L, V$LOGFILE F " +
" WHERE F.GROUP# = L.GROUP# " +
" GROUP BY L.NEXT_CHANGE#" +
" ORDER BY 2";
static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP# " +
" FROM V$LOG L, V$LOGFILE F "+
" WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 "+
" GROUP BY F.GROUP#, L.NEXT_CHANGE# ORDER BY 3";

static final String REDO_LOGS_STATUS = "SELECT F.MEMBER, R.STATUS FROM V$LOGFILE F, V$LOG R WHERE F.GROUP# = R.GROUP# ORDER BY 2";
static final String SWITCH_HISTORY_TOTAL_COUNT = "select 'total', count(1) from v$archived_log where first_time > trunc(sysdate)\n" +
"and dest_id = (select dest_id from V$ARCHIVE_DEST_STATUS where status='VALID' and type='LOCAL')";
static final String SWITCH_HISTORY_TOTAL_COUNT = "select 'total', count(1) from v$archived_log where first_time > trunc(sysdate)" +
" and dest_id = (select dest_id from V$ARCHIVE_DEST_STATUS where status='VALID' and type='LOCAL')";
static final String CURRENT_REDO_LOG_NAME = "select f.member from v$log log, v$logfile f where log.group#=f.group# and log.status='CURRENT'";
static final String AUDIT_TABLE_EXISTS = "SELECT '1' AS ONE FROM USER_TABLES WHERE TABLE_NAME = '" + LOGMNR_AUDIT_TABLE + "'";
static final String AUDIT_TABLE_RECORD_EXISTS = "SELECT '1' AS ONE FROM " + LOGMNR_AUDIT_TABLE;
Expand Down

0 comments on commit 8b36431

Please sign in to comment.