Skip to content

Commit

Permalink
Merge pull request debezium#51 in N4FRA/debezium from ARGO-209312_DBC…
Browse files Browse the repository at this point in the history
…_Oracle_RAC to master

Squashed commit of the following:

commit 44bc04142f38740940ef247540e1b370d1754e11
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed May 13 16:34:20 2020 -0700

    ARGO-209312, db connector  on Oracle RAC

commit 34e10b92b354dbc3804f4c9ed90eb27a9d8f5fa4
Merge: c953356a 66c207f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:18:25 2020 -0700

    Merge branch 'master' into DSCON-301_DBC_Crashed_after_hours_downtime

commit 66c207f
Merge: f1810b9 d5de8d8
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:17:49 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c953356a2b5aed88846be8be41f97249f0f96570
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 09:51:42 2020 -0700

    DSCON-301, DBC Crashed after 2 hours of downtime

commit 3ec1c0bc00d9f7e9737292660cf7f4989b26888a
Merge: bccb2c81 f1810b9
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 15:00:23 2020 -0700

    Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery

commit f1810b9
Merge: 3f0c9de a043ebd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:59:46 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit bccb2c81683362ece4dcc8bd162dba3144ced410
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:56:49 2020 -0700

    DSCON-268, manage millisecondToSleepBetweenMiningQuery  value

commit ac591b18fd5b1f5def9126e1ac25ee4c6c5649df
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:54:30 2020 -0700

    DSCON-268, manage millisecondToSleepBetweenMiningQuery  value

commit 6abfd55c1a6e740dd22c0977c42d1baa834dc47f
Merge: 38ac1d4d 3f0c9de
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:52:44 2020 -0700

    Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery

commit 3f0c9de
Merge: 2de0428 5481902
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 14:52:06 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 38ac1d4d0ae19bcfcc1e110d509e11606f5ff668
Merge: c4d2db35 2de0428
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:46:18 2020 -0700

    Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors

commit 2de0428
Merge: b514c5e 274e969
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:46:06 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c4d2db3592eb492890039726eb1ad5889b5dfa1b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:43:43 2020 -0700

    DSCON-262, Analyse and address parsing errors

commit c8db7e91013df5e2baf22ce981cc0ff9577eb4cf
Merge: d29c1e89 b514c5e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:42:37 2020 -0700

    Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors

commit b514c5e
Merge: babd47d 2b016d7
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon May 4 11:41:54 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d29c1e892800143f201bee4f38eaae08e821e0fe
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Apr 28 14:12:27 2020 -0700

    DSCON-241,
    supplemental logging on the table level not in synch

commit cf3d0870d7f3dbd180af2a9455facd4df4224e0e
Merge: 9cabba11 babd47d
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 12:49:59 2020 -0700

    Merge branch 'master' into DSCON-251_create_table_for_flushing_LMWR_buffer

commit babd47d
Merge: e97206f a991266
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 12:49:30 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9cabba1176dd9c4c7c9004034db708fc13220460
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Apr 20 10:40:02 2020 -0700

    DSCON-251, create audit table as flushing mechanism

commit e97206f
Merge: 815ee19 c8742ae
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu Apr 9 14:43:43 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 40 more commits
  • Loading branch information
Ignatenko Andrey committed May 13, 2020
1 parent d5de8d8 commit 70ad303
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 54 deletions.
Expand Up @@ -20,11 +20,13 @@
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -69,7 +71,7 @@ public static long getCurrentScn(Connection connection) throws SQLException {
}
}

public static void createAuditTable(Connection connection) throws SQLException{
public static void createAuditTable(Connection connection) throws SQLException {
String tableExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_EXISTS);
if (tableExists == null) {
executeCallableStatement(connection, SqlUtils.CREATE_AUDIT_TABLE);
Expand Down Expand Up @@ -161,27 +163,27 @@ static void startOnlineMining(Connection connection, Long startScn, Long endScn,
}

/**
* This method query the database to get CURRENT online redo log file
* This method query the database to get CURRENT online redo log file(s). Multiple is applicable for RAC systems.
* @param connection connection to reuse
* @param metrics MBean accessible metrics
* @return full redo log file name, including path
* @return full redo log file name(s), including path
* @throws SQLException if anything unexpected happens
*/
static String getCurrentRedoLogFile(Connection connection, LogMinerMetrics metrics) throws SQLException {
static Set<String> getCurrentRedoLogFiles(Connection connection, LogMinerMetrics metrics) throws SQLException {
String checkQuery = SqlUtils.CURRENT_REDO_LOG_NAME;

String fileName = "";
Set<String> fileNames = new HashSet<>();
PreparedStatement st = connection.prepareStatement(checkQuery);
ResultSet result = st.executeQuery();
while (result.next()) {
fileName = result.getString(1);
LOGGER.trace(" Current Redo log fileName: {} ", fileName);
fileNames.add(result.getString(1));
LOGGER.trace(" Current Redo log fileName: {} ", fileNames);
}
st.close();
result.close();

metrics.setCurrentLogFileName(fileName);
return fileName;
metrics.setCurrentLogFileName(fileNames);
return fileNames;
}

/**
Expand Down Expand Up @@ -270,7 +272,7 @@ static void checkSupplementalLogging(OracleConnection jdbcConnection, Connection
String validateGlobalLogging = "SELECT '" + key + "', " + " SUPPLEMENTAL_LOG_DATA_ALL from V$DATABASE";
Map<String, String> globalLogging = getMap(connection, validateGlobalLogging, UNKNOWN);
if ("no".equalsIgnoreCase(globalLogging.get(key))) {
throw new RuntimeException("Supplemental logging was not set");
throw new RuntimeException("Supplemental logging was not set. Use command: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
} finally {
if (pdbName != null) {
Expand Down Expand Up @@ -356,9 +358,8 @@ static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, L
* @param connection connection
* @return size
*/
static int getRedoLogGroupSize(Connection connection) throws SQLException {
Map<String, String> allOnlineRedoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");
return allOnlineRedoLogFiles.size();
private static int getRedoLogGroupSize(Connection connection) throws SQLException {
return getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1").size();
}

/**
Expand Down
Expand Up @@ -12,6 +12,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -24,7 +25,7 @@
public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicLong currentScn = new AtomicLong();
private AtomicInteger capturedDmlCount = new AtomicInteger();
private AtomicReference<String> currentLogFileName;
private AtomicReference<String[]> currentLogFileName;
private AtomicReference<String[]> redoLogStatus;
private AtomicInteger switchCounter = new AtomicInteger();
private AtomicReference<Duration> lastLogMinerQueryDuration = new AtomicReference<>();
Expand All @@ -35,17 +36,25 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger maxBatchSize = new AtomicInteger();
private AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger(); // todo delete later
private AtomicInteger fetchedRecordSizeToSleepMore = new AtomicInteger();

private final int MAX_SLEEP_TIME = 3_000;
private final int DEFAULT_SLEEP_TIME = 1_000;
private final int MIN_SLEEP_TIME = 100;
private final int BATCH_SIZE = 10_000;

private final int MIN_BATCH_SIZE = 100;
private final int MAX_BATCH_SIZE = 100_000;
private final int DEFAULT_BATCH_SIZE = 10_000;

private final int SLEEP_TIME_INCREMENT = 200;
private final int SIZE_TO_SLEEP_LONGER = 50;

LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner");

maxBatchSize.set(BATCH_SIZE);
millisecondToSleepBetweenMiningQuery.set(1000);
fetchedRecordSizeToSleepMore.set(50);
maxBatchSize.set(DEFAULT_BATCH_SIZE);
millisecondToSleepBetweenMiningQuery.set(DEFAULT_SLEEP_TIME);
fetchedRecordSizeToSleepMore.set(SIZE_TO_SLEEP_LONGER);

currentScn.set(-1);
capturedDmlCount.set(0);
Expand All @@ -69,8 +78,8 @@ public void setCapturedDmlCount(int count){
capturedDmlCount.set(count);
}

public void setCurrentLogFileName(String name){
currentLogFileName.set(name);
public void setCurrentLogFileName(Set<String> names){
currentLogFileName.set(names.stream().toArray(String[]::new));
}

public void setRedoLogStatus(Map<String, String> status){
Expand Down Expand Up @@ -102,7 +111,7 @@ public int getCapturedDmlCount() {
}

@Override
public String getCurrentRedoLogFileName() {
public String[] getCurrentRedoLogFileName() {
return currentLogFileName.get();
}

Expand Down Expand Up @@ -164,7 +173,7 @@ public int getFetchedRecordSizeToSleepMore() {
// MBean accessible setters
@Override
public void setMaxBatchSize(int size) {
if (size >= 100 && size <= 20_000) {
if (size >= MIN_BATCH_SIZE && size <= MAX_BATCH_SIZE) {
maxBatchSize.set(size);
}
}
Expand All @@ -180,22 +189,15 @@ public void setMillisecondToSleepBetweenMiningQuery(Integer milliseconds) {
public void incrementSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.getAndAdd(200);
millisecondToSleepBetweenMiningQuery.getAndAdd(SLEEP_TIME_INCREMENT);
}
}

@Override
public void resetSleepingTime() {
int sleepTime = millisecondToSleepBetweenMiningQuery.get();
if (sleepTime >= MIN_SLEEP_TIME && sleepTime < MAX_SLEEP_TIME){
millisecondToSleepBetweenMiningQuery.set(100);
}
}

@Override
public void setFetchedRecordSizeToSleepMore(int size) {
if (size >= 50 && size <= 200) {
fetchedRecordSizeToSleepMore.set(size);
millisecondToSleepBetweenMiningQuery.set(MIN_SLEEP_TIME);
}
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ public interface LogMinerMetricsMXBean {
*
* @return full path or NULL if an exception occurs.
*/
String getCurrentRedoLogFileName();
String[] getCurrentRedoLogFileName();

/**
* Exposes states of redo logs: current, active, inactive, unused ...
Expand Down Expand Up @@ -105,12 +105,6 @@ public interface LogMinerMetricsMXBean {
*/
int getFetchedRecordSizeToSleepMore();

/**
* sets the limit of fetched records from Log Miner view.
* @param size number of records
*/
void setFetchedRecordSizeToSleepMore(int size);

/**
* set sleeping time larger
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -95,6 +96,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context) {
Metronome metronome;
int processedCount = logMinerMetrics.getFetchedRecordSizeToSleepMore();

// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while(context.isRunning()) {
Expand Down Expand Up @@ -126,7 +128,7 @@ public void execute(ChangeEventSourceContext context) {
LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
}
LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
String currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
Set<String> currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);

LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, transactionalBuffer,
dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics, catalogName, clock);
Expand All @@ -136,8 +138,9 @@ public void execute(ChangeEventSourceContext context) {
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);

endScn = LogMinerHelper.getNextScn(connection, startScn, logMinerMetrics);

// adjust sleeping time to optimize DB impact and catchup faster when lag is large
if (transactionalBufferMetrics.getLagFromSource() < 2000) {
if (transactionalBufferMetrics.getLagFromSource() < 2000 && processedCount < logMinerMetrics.getFetchedRecordSizeToSleepMore()) {
logMinerMetrics.incrementSleepingTime();
}
if (transactionalBufferMetrics.getLagFromSource() > 10_000) {
Expand All @@ -147,10 +150,10 @@ public void execute(ChangeEventSourceContext context) {
metronome.pause();

LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
String possibleNewCurrentLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
Set<String> possibleNewCurrentLogFile = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);

if (!currentRedoLogFile.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFile, possibleNewCurrentLogFile);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFiles, possibleNewCurrentLogFile);

// This is the way to mitigate PGA leak.
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new mining session
Expand All @@ -166,7 +169,7 @@ public void execute(ChangeEventSourceContext context) {
}

LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);
}

LogMinerHelper.startOnlineMining(connection, startScn, endScn, strategy, isContinuousMining);
Expand All @@ -178,7 +181,7 @@ public void execute(ChangeEventSourceContext context) {

ResultSet res = fetchFromMiningView.executeQuery();
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
int processedCount = processor.processResult(res);
processedCount = processor.processResult(res);

updateStartScn();

Expand All @@ -198,7 +201,7 @@ public void execute(ChangeEventSourceContext context) {
// LogMinerHelper.endMining(connection);
// LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
// LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
// currentRedoLogFile = LogMinerHelper.getCurrentRedoLogFile(connection, logMinerMetrics);
// currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);
// }
}
} catch (Throwable e) {
Expand Down
Expand Up @@ -29,15 +29,14 @@ class SqlUtils {
" + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3'))) from dual";
static final String END_LOGMNR = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;";
static final String OLDEST_FIRST_CHANGE = "SELECT MIN(FIRST_CHANGE#) FROM V$LOG";
static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE " +
" FROM V$LOG L, V$LOGFILE F " +
" WHERE F.GROUP# = L.GROUP# " +
" GROUP BY L.NEXT_CHANGE#" +
" ORDER BY 2";
static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP# " +
" FROM V$LOG L, V$LOGFILE F "+
" WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 "+
" GROUP BY F.GROUP#, L.NEXT_CHANGE# ORDER BY 3";

static final String REDO_LOGS_STATUS = "SELECT F.MEMBER, R.STATUS FROM V$LOGFILE F, V$LOG R WHERE F.GROUP# = R.GROUP# ORDER BY 2";
static final String SWITCH_HISTORY_TOTAL_COUNT = "select 'total', count(1) from v$archived_log where first_time > trunc(sysdate)\n" +
"and dest_id = (select dest_id from V$ARCHIVE_DEST_STATUS where status='VALID' and type='LOCAL')";
static final String SWITCH_HISTORY_TOTAL_COUNT = "select 'total', count(1) from v$archived_log where first_time > trunc(sysdate)" +
" and dest_id = (select dest_id from V$ARCHIVE_DEST_STATUS where status='VALID' and type='LOCAL')";
static final String CURRENT_REDO_LOG_NAME = "select f.member from v$log log, v$logfile f where log.group#=f.group# and log.status='CURRENT'";
static final String AUDIT_TABLE_EXISTS = "SELECT '1' AS ONE FROM USER_TABLES WHERE TABLE_NAME = '" + LOGMNR_AUDIT_TABLE + "'";
static final String AUDIT_TABLE_RECORD_EXISTS = "SELECT '1' AS ONE FROM " + LOGMNR_AUDIT_TABLE;
Expand Down

0 comments on commit 70ad303

Please sign in to comment.