Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed May 4, 2020
2 parents babd47d + 2b016d7 commit b514c5e
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 51 deletions.
Expand Up @@ -121,7 +121,7 @@ public static String removeApostrophes(String text){
/**
* this is to handle cases when a record contains escape character(s)
* @param text before parsing we replaced it with double escape, now revert it back
* @return
* @return string with double slashes
*/
public static String replaceDoubleBackSlashes(String text){
if (text != null && text.contains("\\\\")){
Expand Down
Expand Up @@ -8,7 +8,6 @@
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,12 +18,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -123,6 +123,21 @@ static void updateLogMinerMetrics(Connection connection, LogMinerMetrics metrics
}
}


/**
* Calculate time difference between database and connector
* @param connection connection
* @return difference in milliseconds
*/
static Long getTimeDifference(Connection connection) {
try {
Long dbCurrentMillis = getLongResult(connection, SqlUtils.CURRENT_MILLIS);
return Duration.between(Instant.now(), Instant.ofEpochMilli(dbCurrentMillis)).toMillis();
} catch (SQLException e) {
return 0L;
}
}

/**
* This method builds mining view to query changes from.
* This view is built for online redo log files.
Expand Down Expand Up @@ -239,45 +254,28 @@ private static void removeLogFileFromMining(String logFileName, Connection conne
}

/**
* This method checks if supplemental logging was set on the database level. If so it just logs this info.
* If database level supplemental logging was not set, the method checks if each table has it and set it.
* This method checks if supplemental logging was set on the database level. This is critical check, cannot work if not.
* @param jdbcConnection oracle connection on logminer level
* @param connection conn
* @param pdbName pdb name
* @param tableIds whitelisted tables
* @throws SQLException if anything unexpected happens
*/
static void setSupplementalLoggingForWhitelistedTables(OracleConnection jdbcConnection, Connection connection, String pdbName,
Set<TableId> tableIds) throws SQLException {
if (pdbName != null) {
jdbcConnection.setSessionToPdb(pdbName);
}

final String key = "KEY";
String validateGlobalLogging = "SELECT '" + key + "', " + " SUPPLEMENTAL_LOG_DATA_ALL from V$DATABASE";
Map<String, String> globalLogging = getMap(connection, validateGlobalLogging, UNKNOWN);
if ("no".equalsIgnoreCase(globalLogging.get(key))) {
tableIds.forEach(table -> {
String tableName = table.schema() + "." + table.table();
try {
String validateTableLevelLogging = String.format("SELECT '%s', LOG_GROUP_TYPE FROM DBA_LOG_GROUPS WHERE LOG_GROUP_TYPE='ALL COLUMN LOGGING' AND OWNER ='%s' AND TABLE_NAME = '%s'", key,
table.schema().toUpperCase(), table.table().toUpperCase());
Map<String, String> tableLogging = getMap(connection, validateTableLevelLogging, UNKNOWN);
if (tableLogging.get(key) == null) {
String alterTableStatement = "ALTER TABLE " + tableName + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS";
LOGGER.info("altering table {} for supplemental logging", table.table());
executeCallableStatement(connection, alterTableStatement);
}
} catch (SQLException e) {
throw new RuntimeException("Cannot set supplemental logging for table " + tableName, e);
}
});
} else {
LOGGER.warn("Supplemental logging is set on global level, setting individual table supplemental logging was skipped");
}
static void checkSupplementalLogging(OracleConnection jdbcConnection, Connection connection, String pdbName) throws SQLException {
try {
if (pdbName != null) {
jdbcConnection.setSessionToPdb(pdbName);
}

if (pdbName != null) {
jdbcConnection.resetSessionToCdb();
final String key = "KEY";
String validateGlobalLogging = "SELECT '" + key + "', " + " SUPPLEMENTAL_LOG_DATA_ALL from V$DATABASE";
Map<String, String> globalLogging = getMap(connection, validateGlobalLogging, UNKNOWN);
if ("no".equalsIgnoreCase(globalLogging.get(key))) {
throw new RuntimeException("Supplemental logging was not set");
}
} finally {
if (pdbName != null) {
jdbcConnection.resetSessionToCdb();
}
}
}

Expand Down Expand Up @@ -312,7 +310,12 @@ static void setRedoLogFilesForMining(Connection connection, Long lastProcessedSc

Map<String, Long> logFilesForMining = getLogFilesForOffsetScn(connection, lastProcessedScn);
if (logFilesForMining.isEmpty()) {
throw new IllegalStateException("The online log files do not contain offset SCN, re-snapshot is required.");
throw new IllegalStateException("The online log files do not contain offset SCN: " + lastProcessedScn + ", re-snapshot is required.");
}

int redoLogGroupSize = getRedoLogGroupSize(connection);
if (logFilesForMining.size() == redoLogGroupSize) {
throw new IllegalStateException("All online log files needed for mining the offset SCN: " + lastProcessedScn + ", re-snapshot is required.");
}

List<String> logFilesNamesForMining = logFilesForMining.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
Expand Down Expand Up @@ -348,6 +351,15 @@ static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, L
return Optional.empty();
}

/**
* get size of online REDO groups
* @param connection connection
* @return size
*/
static int getRedoLogGroupSize(Connection connection) throws SQLException {
Map<String, String> allOnlineRedoLogFiles = getMap(connection, SqlUtils.ALL_ONLINE_LOGS, "-1");
return allOnlineRedoLogFiles.size();
}

/**
* This method returns all online log files, starting from one which contains offset SCN and ending with one containing largest SCN
Expand Down Expand Up @@ -392,4 +404,13 @@ private static String getStringResult(Connection connection, String query) throw
}
}

private static Long getLongResult(Connection connection, String query) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(query);
ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
return rs.getLong(1);
}
return System.currentTimeMillis();
}
}
}
Expand Up @@ -46,6 +46,9 @@ public class LogMinerQueryResultProcessor {
private final String catalogName;
private final Clock clock;
private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
private long currentOffsetScn = 0;
private long currentOffsetCommitScn = 0;


public LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics,
TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser,
Expand Down Expand Up @@ -150,7 +153,7 @@ public int processResult(ResultSet resultSet) {

LOGGER.trace("parsed record: {}" , rowLcr);
if (rowLcr == null || redo_sql == null) {
LOGGER.warn("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
LOGGER.trace("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
continue;
}

Expand Down Expand Up @@ -181,6 +184,14 @@ public int processResult(ResultSet resultSet) {
transactionalBufferMetrics.incrementUfvDelete();
}
}
if ("inv_wi".equalsIgnoreCase(tableName)) {
if (operationCode == 1) {
transactionalBufferMetrics.incrementWiInsert();
}
if (operationCode == 2) {
transactionalBufferMetrics.incrementWiDelete();
}
}

transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn, commitScn, counter) -> {
// update SCN in offset context only if processed SCN less than SCN among other transactions
Expand Down Expand Up @@ -211,6 +222,11 @@ public int processResult(ResultSet resultSet) {
metrics.setProcessedCapturedBatchDuration(Duration.between(startTime, Instant.now()));
metrics.setCapturedDmlCount(dmlCounter);
if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) {
if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) {
LOGGER.warn("offset SCN {} did not change, the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn());
}
currentOffsetScn = offsetContext.getScn();
currentOffsetCommitScn = offsetContext.getCommitScn();
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks were processed in {} milliseconds, commit time:{}, rollback time: {}, parse time:{}, " +
"other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}",
dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()),
Expand Down
Expand Up @@ -26,11 +26,13 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility.
Expand Down Expand Up @@ -104,6 +106,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio

startScn = offsetContext.getScn();
LogMinerHelper.createAuditTable(connection);
LOGGER.trace("current millis {}, db time {}", System.currentTimeMillis(), LogMinerHelper.getTimeDifference(connection));
transactionalBufferMetrics.setTimeDifference(new AtomicLong(LogMinerHelper.getTimeDifference(connection)));

long oldestScnInOnlineRedo = LogMinerHelper.getFirstOnlineLogScn(connection);
if (startScn < oldestScnInOnlineRedo) {
Expand All @@ -112,7 +116,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio

// 1. Configure Log Miner to mine online redo logs
LogMinerHelper.setNlsSessionParameters(jdbcConnection);
LogMinerHelper.setSupplementalLoggingForWhitelistedTables(jdbcConnection, connection, connectorConfig.getPdbName(), schema.tableIds());
LogMinerHelper.checkSupplementalLogging(jdbcConnection, connection, connectorConfig.getPdbName());

LOGGER.debug("Data dictionary catalog = {}", strategy.getValue());

Expand Down Expand Up @@ -153,15 +157,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LogMinerHelper.buildDataDictionary(connection);
}

// Abandon long running transactions
Optional<Long> lastScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn());
lastScnToAbandonTransactions.ifPresent(nextOffsetScn -> {
transactionalBuffer.abandonLongTransactions(nextOffsetScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOffsetScn);
offsetContext.setScn(nextOffsetScn);
updateStartScn();
});

abandonOldTransactionsIfExist(connection);
LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
}

Expand Down Expand Up @@ -222,6 +218,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}

private void abandonOldTransactionsIfExist(Connection connection) throws SQLException {
Optional<Long> lastScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn());
lastScnToAbandonTransactions.ifPresent(nextOffsetScn -> {
transactionalBuffer.abandonLongTransactions(nextOffsetScn);
LOGGER.debug("After abandoning, offset before: {}, offset after:{}", offsetContext.getScn(), nextOffsetScn);
offsetContext.setScn(nextOffsetScn);
updateStartScn();
});
}

private void updateStartScn() {
long nextStartScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
if (nextStartScn <= startScn) {
Expand Down
Expand Up @@ -25,6 +25,8 @@ class SqlUtils {

static final String BUILD_DICTIONARY = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";
static final String CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE";
static final String CURRENT_MILLIS = "select TO_CHAR(extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 " +
" + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3'))) from dual";
static final String END_LOGMNR = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;";
static final String OLDEST_FIRST_CHANGE = "SELECT MIN(FIRST_CHANGE#) FROM V$LOG";
static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE " +
Expand Down
Expand Up @@ -37,11 +37,15 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
private AtomicReference<Set<String>> rolledBackTransactionIds = new AtomicReference<>();
private Instant startTime;
private static long MILLIS_PER_SECOND = 1000L;
private AtomicLong timeDifference = new AtomicLong();



// temp todo delete after stowplan testing
private Long ufvDelete = 0L;
private Long ufvInsert = 0L;
private Long wiDelete = 0L;
private Long wiInsert = 0L;

public Long getUfvDelete() {
return ufvDelete;
Expand All @@ -66,12 +70,31 @@ public void decrementUfvInsert() {
this.ufvInsert--;
}

@Override
public Long getWiDelete() {
return wiDelete;
}

@Override
public void incrementWiDelete() {
this.wiDelete++;
}

@Override
public Long getWiInsert() {
return wiInsert;
}

@Override
public void incrementWiInsert() {
wiInsert++;
}

TransactionalBufferMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner-transactional-buffer");
startTime = Instant.now();
oldestScn.set(-1);
committedScn.set(-1);
lagFromTheSource.set(Duration.ZERO);
reset();
}

Expand All @@ -84,10 +107,15 @@ public void setCommittedScn(Long scn){
committedScn.set(scn);
}

// todo deal with timezones
public void setTimeDifference(AtomicLong timeDifference) {
this.timeDifference = timeDifference;
}

void setLagFromTheSource(Instant changeTime){
if (changeTime != null) {
lagFromTheSource.set(Duration.between(changeTime, Instant.now()));
// lagFromTheSource.set(Duration.between(Instant.now(), changeTime.minus(Duration.ofMillis(timeDifference.longValue()))).abs());
lagFromTheSource.set(Duration.between(Instant.now(), changeTime).abs());

if (maxLagFromTheSource.get().toMillis() < lagFromTheSource.get().toMillis()) {
maxLagFromTheSource.set(lagFromTheSource.get());
}
Expand Down Expand Up @@ -219,6 +247,8 @@ public void reset() {
lagFromTheSource.set(Duration.ZERO);
ufvDelete = 0L;
ufvInsert = 0L;
wiInsert = 0L;
wiDelete = 0L;
}

@Override
Expand Down
Expand Up @@ -113,4 +113,9 @@ public interface TransactionalBufferMetricsMXBean {
void incrementUfvDelete();
Long getUfvInsert();
void incrementUfvInsert();

Long getWiDelete();
void incrementWiDelete();
Long getWiInsert();
void incrementWiInsert();
}

0 comments on commit b514c5e

Please sign in to comment.