Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed May 28, 2020
2 parents a54f7a3 + 57a0b1c commit 539c214
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 157 deletions.
6 changes: 6 additions & 0 deletions debezium-connector-oracle/pom.xml
Expand Up @@ -85,6 +85,12 @@
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Required by VerifyRecord -->
<dependency>
<groupId>io.confluent</groupId>
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
Expand All @@ -36,6 +37,7 @@ public class LogMinerHelper {

private final static String UNKNOWN = "unknown";
private final static Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class);
private enum DATATYPE {LONG, TIMESTAMP, STRING}

/**
* This builds data dictionary objects in redo log files.
Expand Down Expand Up @@ -71,16 +73,16 @@ public static long getCurrentScn(Connection connection) throws SQLException {
}
}

public static void createAuditTable(Connection connection) throws SQLException {
String tableExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_EXISTS);
static void createAuditTable(Connection connection) throws SQLException {
String tableExists = (String) getSingleResult(connection, SqlUtils.AUDIT_TABLE_EXISTS, DATATYPE.STRING);
if (tableExists == null) {
executeCallableStatement(connection, SqlUtils.CREATE_AUDIT_TABLE);
}

String recordExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_RECORD_EXISTS);
String recordExists = (String) getSingleResult(connection, SqlUtils.AUDIT_TABLE_RECORD_EXISTS, DATATYPE.STRING);
if (recordExists == null) {
executeCallableStatement(connection, SqlUtils.INSERT_AUDIT_TABLE);
executeCallableStatement(connection, "commit");
connection.commit();
}
}

Expand Down Expand Up @@ -131,17 +133,18 @@ static void updateLogMinerMetrics(Connection connection, LogMinerMetrics metrics


/**
* Calculate time difference between database and connector
* Calculate time difference between database and connector timers. It could be negative if DB time is ahead.
* @param connection connection
* @return difference in milliseconds
*/
static Long getTimeDifference(Connection connection) {
try {
Long dbCurrentMillis = getLongResult(connection, SqlUtils.CURRENT_MILLIS);
return Duration.between(Instant.now(), Instant.ofEpochMilli(dbCurrentMillis)).toMillis();
} catch (SQLException e) {
return 0L;
static long getTimeDifference(Connection connection) throws SQLException {
Timestamp dbCurrentMillis = (Timestamp) getSingleResult(connection, SqlUtils.CURRENT_TIMESTAMP, DATATYPE.TIMESTAMP);
if (dbCurrentMillis == null) {
return 0;
}
Instant fromDb = dbCurrentMillis.toInstant();
Instant now = Instant.now();
return Duration.between(fromDb, now).toMillis();
}

/**
Expand Down Expand Up @@ -244,21 +247,6 @@ private static int getSwitchCount(Connection connection) {
return 0;
}

/**
* After a switch, we should remove it from the analysis.
* NOTE. It does not physically remove the log file.
*
* @param logFileName file to delete from the analysis
* @param connection container level database connection
* @throws SQLException if anything unexpected happens
*/
private static void removeLogFileFromMining(String logFileName, Connection connection) throws SQLException {
String removeLogFileFromMining = SqlUtils.getRemoveLogFileFromMiningStatement(logFileName);
executeCallableStatement(connection, removeLogFileFromMining);
LOGGER.debug("{} was removed from mining session", removeLogFileFromMining);

}

/**
* This method checks if supplemental logging was set on the database level. This is critical check, cannot work if not.
* @param jdbcConnection oracle connection on logminer level
Expand Down Expand Up @@ -356,6 +344,16 @@ static Optional<Long> getLastScnFromTheOldestOnlineRedo(Connection connection, L
return Optional.empty();
}

static void logWarn(TransactionalBufferMetrics metrics, String format, Object...args){
LOGGER.warn(format, args);
metrics.incrementWarningCounter();
}

static void logError(TransactionalBufferMetrics metrics, String format, Object...args){
LOGGER.error(format, args);
metrics.incrementErrorCounter();
}

/**
* get size of online REDO groups
* @param connection connection
Expand Down Expand Up @@ -398,23 +396,20 @@ private static Map<String, String> getMap(Connection connection, String query, S
}
}

private static String getStringResult(Connection connection, String query) throws SQLException {
private static Object getSingleResult(Connection connection, String query, DATATYPE type) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(query);
ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
return rs.getString(1);
switch (type){
case LONG :
return rs.getLong(1);
case TIMESTAMP:
return rs.getTimestamp(1);
case STRING:
return rs.getString(1);
}
}
return null;
}
}

private static Long getLongResult(Connection connection, String query) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(query);
ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
return rs.getLong(1);
}
return System.currentTimeMillis();
}
}
}
Expand Up @@ -71,8 +71,8 @@ public void setCurrentScn(Long scn){
currentScn.set(scn);
}

public void setCapturedDmlCount(int count){
capturedDmlCount.set(count);
public void incrementCapturedDmlCount() {
capturedDmlCount.incrementAndGet();
}

public void setCurrentLogFileName(Set<String> names){
Expand Down
Expand Up @@ -82,7 +82,7 @@ public interface LogMinerMetricsMXBean {
int getBatchSize();

/**
* this gives ability to manipulate maximum number of entries in Log Miner view to fetch.
* this gives ability to manipulate number of entries in Log Miner view to fetch.
* It has limits to prevent abnormal values
* @param size limit
*/
Expand Down
Expand Up @@ -87,25 +87,25 @@ int processResult(ResultSet resultSet) {
break;
}
} catch (SQLException e) {
RowMapper.logError(e, "Closed resultSet");
LogMinerHelper.logError(transactionalBufferMetrics, "Closed resultSet");
return 0;
}

Instant iterationStart = Instant.now();

BigDecimal scn = RowMapper.getScn(resultSet);
String redo_sql = RowMapper.getSqlRedo(resultSet);
String tableName = RowMapper.getTableName(resultSet);
String segOwner = RowMapper.getSegOwner(resultSet);
int operationCode = RowMapper.getOperationCode(resultSet);
Timestamp changeTime = RowMapper.getChangeTime(resultSet);
String txId = RowMapper.getTransactionId(resultSet);
BigDecimal scn = RowMapper.getScn(transactionalBufferMetrics, resultSet);
String redo_sql = RowMapper.getSqlRedo(transactionalBufferMetrics, resultSet);
String tableName = RowMapper.getTableName(transactionalBufferMetrics, resultSet);
String segOwner = RowMapper.getSegOwner(transactionalBufferMetrics, resultSet);
int operationCode = RowMapper.getOperationCode(transactionalBufferMetrics, resultSet);
Timestamp changeTime = RowMapper.getChangeTime(transactionalBufferMetrics, resultSet);
String txId = RowMapper.getTransactionId(transactionalBufferMetrics, resultSet);

String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s, " +
" commitOffsetSCN= %s", txId, scn, tableName, segOwner, operationCode, offsetContext.getScn(), offsetContext.getCommitScn());

if (scn == null) {
LOGGER.warn("Scn is null for {}", logMessage);
LogMinerHelper.logWarn(transactionalBufferMetrics, "Scn is null for {}", logMessage);
return 0;
}

Expand Down Expand Up @@ -136,14 +136,15 @@ int processResult(ResultSet resultSet) {

// MISSING_SCN
if (operationCode == RowMapper.MISSING_SCN) {
LOGGER.warn("Missing SCN, {}", logMessage);
LogMinerHelper.logWarn(transactionalBufferMetrics, "Missing SCN, {}", logMessage);
continue;
}

// DML
if (operationCode == RowMapper.INSERT || operationCode == RowMapper.DELETE || operationCode == RowMapper.UPDATE) {
LOGGER.trace("DML, {}, sql {}", logMessage, redo_sql);
dmlCounter++;
metrics.incrementCapturedDmlCount();
iterationStart = Instant.now();
LogMinerRowLcr rowLcr = dmlParser.parse(redo_sql, schema.getTables(), txId);
cumulativeParseTime = cumulativeParseTime.plus(Duration.between(iterationStart, Instant.now()));
Expand Down Expand Up @@ -189,6 +190,14 @@ int processResult(ResultSet resultSet) {
transactionalBufferMetrics.incrementWiDelete();
}
}
if ("road_truck_visit_details".equalsIgnoreCase(tableName)) {
if (operationCode == 1) {
transactionalBufferMetrics.incrementRTDInsert();
}
if (operationCode == 2) {
transactionalBufferMetrics.incrementRTDDelete();
}
}

transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn, commitScn, counter) -> {
// update SCN in offset context only if processed SCN less than SCN among other transactions
Expand All @@ -212,12 +221,11 @@ int processResult(ResultSet resultSet) {
cumulativeOtherTime = cumulativeOtherTime.plus(Duration.between(iterationStart, Instant.now()));

} catch (Exception e) {
LOGGER.error("Following rowLcr: {} cannot be dispatched due to the : {}", rowLcr, e);
LogMinerHelper.logError(transactionalBufferMetrics, "Following rowLcr: {} cannot be dispatched due to the : {}", rowLcr, e);
}
}
}
metrics.setProcessedCapturedBatchDuration(Duration.between(startTime, Instant.now()));
metrics.setCapturedDmlCount(dmlCounter);
if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) {
warnStuckScn();
currentOffsetScn = offsetContext.getScn();
Expand All @@ -241,9 +249,11 @@ int processResult(ResultSet resultSet) {
private void warnStuckScn() {
if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) {
stuckScnCounter++;
// warn only once
// logWarn only once
if (stuckScnCounter == 5) {
LOGGER.warn("Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn());
LogMinerHelper.logWarn(transactionalBufferMetrics,
"Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn());
transactionalBufferMetrics.incrementScnFreezeCounter();
}
} else {
stuckScnCounter = 0;
Expand Down
Expand Up @@ -49,8 +49,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final OracleDatabaseSchema schema;
private final OracleOffsetContext offsetContext;
private final TransactionalBuffer transactionalBuffer;
// todo introduce injection of appropriate parser
// private final OracleDmlParser dmlParser;
private final SimpleDmlParser dmlParser;
private final String catalogName;
private OracleConnectorConfig connectorConfig;
Expand All @@ -71,17 +69,12 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.schema = schema;
this.offsetContext = offsetContext;
OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(jdbcConnection);

this.connectorConfig = connectorConfig;

// this.dmlParser = new OracleDmlParser(true, connectorConfig.getDatabaseName(), connectorConfig.getSchemaName(),
// converters);
this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName();
this.dmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converters);
this.transactionalBufferMetrics = new TransactionalBufferMetrics(taskContext);
this.transactionalBufferMetrics.register(LOGGER);
transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler, transactionalBufferMetrics);

this.logMinerMetrics = new LogMinerMetrics(taskContext);
this.logMinerMetrics.register(LOGGER);
this.strategy = connectorConfig.getLogMiningStrategy();
Expand Down Expand Up @@ -176,13 +169,10 @@ public void execute(ChangeEventSourceContext context) {
processor.processResult(res);

updateStartScn();

// get largest scn from the last uncommitted transaction and set as last processed scn
LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());

// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
// When the buffer is empty, move mining boundaries forward
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
}
Expand All @@ -191,17 +181,16 @@ public void execute(ChangeEventSourceContext context) {
// we don't do it for other modes to save time on building data dictionary
// if (strategy == OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) {
// LogMinerHelper.endMining(connection);
// LogMinerHelper.setRedoLogFilesForMining(connection, startScn);
// LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics);
// currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics);
// }
}
} catch (Throwable e) {
if (connectionProblem(e)) {
LOGGER.warn("Disconnection occurred. {} ", e.toString());
LogMinerHelper.logWarn(transactionalBufferMetrics, "Disconnection occurred. {} ", e.toString());
continue;
}
LOGGER.error("Mining session was stopped due to the {} ", e.toString());
LogMinerHelper.logError(transactionalBufferMetrics, "Mining session was stopped due to the {} ", e.toString());
throw new RuntimeException(e);
} finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
Expand All @@ -215,7 +204,7 @@ public void execute(ChangeEventSourceContext context) {
private void abandonOldTransactionsIfExist(Connection connection) throws SQLException {
Optional<Long> lastScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn());
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
LOGGER.debug("All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
LogMinerHelper.logWarn(transactionalBufferMetrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
transactionalBuffer.abandonLongTransactions(thresholdScn);
offsetContext.setScn(thresholdScn);
updateStartScn();
Expand Down

0 comments on commit 539c214

Please sign in to comment.