From 57a0b1cb5f80c4de91358d1990d7dffcbed7ffd5 Mon Sep 17 00:00:00 2001 From: Ignatenko Andrey Date: Thu, 28 May 2020 13:42:30 -0700 Subject: [PATCH] Merge pull request #54 in N4FRA/debezium from DSCON-372_clean_Oracle_connector_code to master Squashed commit of the following: commit 88132371b02e0a4f8a285b3521163c60d2bcb5c0 Merge: e0ce77ec a54f7a3b Author: AndreyIg Date: Thu May 28 12:16:35 2020 -0700 merge conflicts commit e0ce77eca443cd6315cb7f5402b0007a04b7aba4 Author: AndreyIg Date: Thu May 28 12:10:43 2020 -0700 DSCON-372, clean up code of Oracle connector commit a54f7a3b65b7399d5d2fa1ca790cc21476a89b54 Merge: 796bf4b6 7c49efd0 Author: AndreyIg Date: Fri May 22 14:31:05 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit dd4b64677940330a48f32dc5e2c6a76dcbb18cc2 Author: AndreyIg Date: Fri May 22 14:26:47 2020 -0700 DSCON-364, JSQLParserException with new tables commit aaa85742ecd60ace8e6baaa85aafb84fb3b7ff37 Merge: df3d36ac 796bf4b6 Author: AndreyIg Date: Fri May 15 12:19:28 2020 -0700 Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC commit 796bf4b67c05d94e0e2cc106481a05d49abc01e8 Merge: 8b36431f 2c1fc9e6 Author: AndreyIg Date: Fri May 15 12:19:16 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit df3d36ac7d0d8a7927d90b47fd6c92555644f285 Author: AndreyIg Date: Fri May 15 12:16:18 2020 -0700 ARGO-209312, db connector on Oracle RAC commit aac6d3a4110a4ec08b39f68a39ec1e37eb66e7eb Merge: 44bc0414 8b36431f Author: AndreyIg Date: Fri May 15 12:15:50 2020 -0700 Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC commit 8b36431f0da8dd3fcd784f4f22a61bee00af239e Merge: 66c207ff 70ad3037 Author: AndreyIg Date: Wed May 13 16:37:01 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 44bc04142f38740940ef247540e1b370d1754e11 Author: AndreyIg Date: Wed May 13 16:34:20 2020 -0700 ARGO-209312, db connector on Oracle RAC commit 34e10b92b354dbc3804f4c9ed90eb27a9d8f5fa4 Merge: c953356a 66c207ff Author: AndreyIg Date: Tue May 12 11:18:25 2020 -0700 Merge branch 'master' into DSCON-301_DBC_Crashed_after_hours_downtime commit 66c207ff7828caac31fa0f9f084f0628a9968d37 Merge: f1810b91 d5de8d8e Author: AndreyIg Date: Tue May 12 11:17:49 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit c953356a2b5aed88846be8be41f97249f0f96570 Author: AndreyIg Date: Tue May 12 09:51:42 2020 -0700 DSCON-301, DBC Crashed after 2 hours of downtime commit 3ec1c0bc00d9f7e9737292660cf7f4989b26888a Merge: bccb2c81 f1810b91 Author: AndreyIg Date: Thu May 7 15:00:23 2020 -0700 Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery commit f1810b910b2ccc5e56a15c057e122775ab901070 Merge: 3f0c9de3 a043ebd9 Author: AndreyIg Date: Thu May 7 14:59:46 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit bccb2c81683362ece4dcc8bd162dba3144ced410 Author: AndreyIg Date: Thu May 7 14:56:49 2020 -0700 DSCON-268, manage millisecondToSleepBetweenMiningQuery value commit ac591b18fd5b1f5def9126e1ac25ee4c6c5649df Author: AndreyIg Date: Thu May 7 14:54:30 2020 -0700 DSCON-268, manage millisecondToSleepBetweenMiningQuery value commit 6abfd55c1a6e740dd22c0977c42d1baa834dc47f Merge: 38ac1d4d 3f0c9de3 Author: AndreyIg Date: Thu May 7 14:52:44 2020 -0700 Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery commit 3f0c9de335a85265164dc86b986cb57bd05a80ef Merge: 2de04280 5481902e Author: AndreyIg Date: Thu May 7 14:52:06 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 38ac1d4d0ae19bcfcc1e110d509e11606f5ff668 Merge: c4d2db35 2de04280 Author: AndreyIg Date: Mon May 4 11:46:18 2020 -0700 Merge branch 'master' into DSCON-262_Analyse_address_parsing_errors ... and 49 more commits --- debezium-connector-oracle/pom.xml | 6 + .../oracle/logminer/LogMinerHelper.java | 71 +++++---- .../oracle/logminer/LogMinerMetrics.java | 4 +- .../logminer/LogMinerMetricsMXBean.java | 2 +- .../LogMinerQueryResultProcessor.java | 38 +++-- .../LogMinerStreamingChangeEventSource.java | 17 +-- .../connector/oracle/logminer/RowMapper.java | 34 ++--- .../connector/oracle/logminer/SqlUtils.java | 13 +- .../oracle/logminer/TransactionalBuffer.java | 53 +++---- .../logminer/TransactionalBufferMetrics.java | 103 ++++++++++--- .../TransactionalBufferMetricsMXBean.java | 23 ++- .../oracle/LogMinerOracleConnectorIT.java | 21 +++ .../oracle/logminer/LogMinerMetricsTest.java | 90 ++++++++++++ .../TransactionalBufferMetricsTest.java | 138 ++++++++++++++++++ .../logminer/TransactionalBufferTest.java | 4 +- .../connector/oracle/util/TestHelper.java | 27 ++-- 16 files changed, 487 insertions(+), 157 deletions(-) create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerMetricsTest.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java diff --git a/debezium-connector-oracle/pom.xml b/debezium-connector-oracle/pom.xml index 71eea7b5a..2a84a1328 100644 --- a/debezium-connector-oracle/pom.xml +++ b/debezium-connector-oracle/pom.xml @@ -85,6 +85,12 @@ fest-assert test + + org.mockito + mockito-core + test + + io.confluent diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java index 1d0ba809d..2a6624f05 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java @@ -18,6 +18,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; import java.util.HashSet; @@ -36,6 +37,7 @@ public class LogMinerHelper { private final static String UNKNOWN = "unknown"; private final static Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class); + private enum DATATYPE {LONG, TIMESTAMP, STRING} /** * This builds data dictionary objects in redo log files. @@ -71,16 +73,16 @@ public static long getCurrentScn(Connection connection) throws SQLException { } } - public static void createAuditTable(Connection connection) throws SQLException { - String tableExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_EXISTS); + static void createAuditTable(Connection connection) throws SQLException { + String tableExists = (String) getSingleResult(connection, SqlUtils.AUDIT_TABLE_EXISTS, DATATYPE.STRING); if (tableExists == null) { executeCallableStatement(connection, SqlUtils.CREATE_AUDIT_TABLE); } - String recordExists = getStringResult(connection, SqlUtils.AUDIT_TABLE_RECORD_EXISTS); + String recordExists = (String) getSingleResult(connection, SqlUtils.AUDIT_TABLE_RECORD_EXISTS, DATATYPE.STRING); if (recordExists == null) { executeCallableStatement(connection, SqlUtils.INSERT_AUDIT_TABLE); - executeCallableStatement(connection, "commit"); + connection.commit(); } } @@ -131,17 +133,18 @@ static void updateLogMinerMetrics(Connection connection, LogMinerMetrics metrics /** - * Calculate time difference between database and connector + * Calculate time difference between database and connector timers. It could be negative if DB time is ahead. * @param connection connection * @return difference in milliseconds */ - static Long getTimeDifference(Connection connection) { - try { - Long dbCurrentMillis = getLongResult(connection, SqlUtils.CURRENT_MILLIS); - return Duration.between(Instant.now(), Instant.ofEpochMilli(dbCurrentMillis)).toMillis(); - } catch (SQLException e) { - return 0L; + static long getTimeDifference(Connection connection) throws SQLException { + Timestamp dbCurrentMillis = (Timestamp) getSingleResult(connection, SqlUtils.CURRENT_TIMESTAMP, DATATYPE.TIMESTAMP); + if (dbCurrentMillis == null) { + return 0; } + Instant fromDb = dbCurrentMillis.toInstant(); + Instant now = Instant.now(); + return Duration.between(fromDb, now).toMillis(); } /** @@ -244,21 +247,6 @@ private static int getSwitchCount(Connection connection) { return 0; } - /** - * After a switch, we should remove it from the analysis. - * NOTE. It does not physically remove the log file. - * - * @param logFileName file to delete from the analysis - * @param connection container level database connection - * @throws SQLException if anything unexpected happens - */ - private static void removeLogFileFromMining(String logFileName, Connection connection) throws SQLException { - String removeLogFileFromMining = SqlUtils.getRemoveLogFileFromMiningStatement(logFileName); - executeCallableStatement(connection, removeLogFileFromMining); - LOGGER.debug("{} was removed from mining session", removeLogFileFromMining); - - } - /** * This method checks if supplemental logging was set on the database level. This is critical check, cannot work if not. * @param jdbcConnection oracle connection on logminer level @@ -356,6 +344,16 @@ static Optional getLastScnFromTheOldestOnlineRedo(Connection connection, L return Optional.empty(); } + static void logWarn(TransactionalBufferMetrics metrics, String format, Object...args){ + LOGGER.warn(format, args); + metrics.incrementWarningCounter(); + } + + static void logError(TransactionalBufferMetrics metrics, String format, Object...args){ + LOGGER.error(format, args); + metrics.incrementErrorCounter(); + } + /** * get size of online REDO groups * @param connection connection @@ -398,23 +396,20 @@ private static Map getMap(Connection connection, String query, S } } - private static String getStringResult(Connection connection, String query) throws SQLException { + private static Object getSingleResult(Connection connection, String query, DATATYPE type) throws SQLException { try (PreparedStatement statement = connection.prepareStatement(query); ResultSet rs = statement.executeQuery()) { if (rs.next()) { - return rs.getString(1); + switch (type){ + case LONG : + return rs.getLong(1); + case TIMESTAMP: + return rs.getTimestamp(1); + case STRING: + return rs.getString(1); + } } return null; } } - - private static Long getLongResult(Connection connection, String query) throws SQLException { - try (PreparedStatement statement = connection.prepareStatement(query); - ResultSet rs = statement.executeQuery()) { - if (rs.next()) { - return rs.getLong(1); - } - return System.currentTimeMillis(); - } - } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java index 499a8a653..aa4df7159 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java @@ -71,8 +71,8 @@ public void setCurrentScn(Long scn){ currentScn.set(scn); } - public void setCapturedDmlCount(int count){ - capturedDmlCount.set(count); + public void incrementCapturedDmlCount() { + capturedDmlCount.incrementAndGet(); } public void setCurrentLogFileName(Set names){ diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java index 840bf8892..27faa19be 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java @@ -82,7 +82,7 @@ public interface LogMinerMetricsMXBean { int getBatchSize(); /** - * this gives ability to manipulate maximum number of entries in Log Miner view to fetch. + * this gives ability to manipulate number of entries in Log Miner view to fetch. * It has limits to prevent abnormal values * @param size limit */ diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 164e7f830..2c1211bbc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -87,25 +87,25 @@ int processResult(ResultSet resultSet) { break; } } catch (SQLException e) { - RowMapper.logError(e, "Closed resultSet"); + LogMinerHelper.logError(transactionalBufferMetrics, "Closed resultSet"); return 0; } Instant iterationStart = Instant.now(); - BigDecimal scn = RowMapper.getScn(resultSet); - String redo_sql = RowMapper.getSqlRedo(resultSet); - String tableName = RowMapper.getTableName(resultSet); - String segOwner = RowMapper.getSegOwner(resultSet); - int operationCode = RowMapper.getOperationCode(resultSet); - Timestamp changeTime = RowMapper.getChangeTime(resultSet); - String txId = RowMapper.getTransactionId(resultSet); + BigDecimal scn = RowMapper.getScn(transactionalBufferMetrics, resultSet); + String redo_sql = RowMapper.getSqlRedo(transactionalBufferMetrics, resultSet); + String tableName = RowMapper.getTableName(transactionalBufferMetrics, resultSet); + String segOwner = RowMapper.getSegOwner(transactionalBufferMetrics, resultSet); + int operationCode = RowMapper.getOperationCode(transactionalBufferMetrics, resultSet); + Timestamp changeTime = RowMapper.getChangeTime(transactionalBufferMetrics, resultSet); + String txId = RowMapper.getTransactionId(transactionalBufferMetrics, resultSet); String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s, " + " commitOffsetSCN= %s", txId, scn, tableName, segOwner, operationCode, offsetContext.getScn(), offsetContext.getCommitScn()); if (scn == null) { - LOGGER.warn("Scn is null for {}", logMessage); + LogMinerHelper.logWarn(transactionalBufferMetrics, "Scn is null for {}", logMessage); return 0; } @@ -136,7 +136,7 @@ int processResult(ResultSet resultSet) { // MISSING_SCN if (operationCode == RowMapper.MISSING_SCN) { - LOGGER.warn("Missing SCN, {}", logMessage); + LogMinerHelper.logWarn(transactionalBufferMetrics, "Missing SCN, {}", logMessage); continue; } @@ -144,6 +144,7 @@ int processResult(ResultSet resultSet) { if (operationCode == RowMapper.INSERT || operationCode == RowMapper.DELETE || operationCode == RowMapper.UPDATE) { LOGGER.trace("DML, {}, sql {}", logMessage, redo_sql); dmlCounter++; + metrics.incrementCapturedDmlCount(); iterationStart = Instant.now(); LogMinerRowLcr rowLcr = dmlParser.parse(redo_sql, schema.getTables(), txId); cumulativeParseTime = cumulativeParseTime.plus(Duration.between(iterationStart, Instant.now())); @@ -189,6 +190,14 @@ int processResult(ResultSet resultSet) { transactionalBufferMetrics.incrementWiDelete(); } } + if ("road_truck_visit_details".equalsIgnoreCase(tableName)) { + if (operationCode == 1) { + transactionalBufferMetrics.incrementRTDInsert(); + } + if (operationCode == 2) { + transactionalBufferMetrics.incrementRTDDelete(); + } + } transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn, commitScn, counter) -> { // update SCN in offset context only if processed SCN less than SCN among other transactions @@ -212,12 +221,11 @@ int processResult(ResultSet resultSet) { cumulativeOtherTime = cumulativeOtherTime.plus(Duration.between(iterationStart, Instant.now())); } catch (Exception e) { - LOGGER.error("Following rowLcr: {} cannot be dispatched due to the : {}", rowLcr, e); + LogMinerHelper.logError(transactionalBufferMetrics, "Following rowLcr: {} cannot be dispatched due to the : {}", rowLcr, e); } } } metrics.setProcessedCapturedBatchDuration(Duration.between(startTime, Instant.now())); - metrics.setCapturedDmlCount(dmlCounter); if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) { warnStuckScn(); currentOffsetScn = offsetContext.getScn(); @@ -241,9 +249,11 @@ int processResult(ResultSet resultSet) { private void warnStuckScn() { if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) { stuckScnCounter++; - // warn only once + // logWarn only once if (stuckScnCounter == 5) { - LOGGER.warn("Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn()); + LogMinerHelper.logWarn(transactionalBufferMetrics, + "Offset SCN {} did not change in five mining cycles, hence the oldest transaction was not committed. Offset commit SCN: {}", currentOffsetScn, offsetContext.getCommitScn()); + transactionalBufferMetrics.incrementScnFreezeCounter(); } } else { stuckScnCounter = 0; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index fe1cbcf51..d3ad6bf27 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -49,8 +49,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS private final OracleDatabaseSchema schema; private final OracleOffsetContext offsetContext; private final TransactionalBuffer transactionalBuffer; - // todo introduce injection of appropriate parser -// private final OracleDmlParser dmlParser; private final SimpleDmlParser dmlParser; private final String catalogName; private OracleConnectorConfig connectorConfig; @@ -71,17 +69,12 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, this.schema = schema; this.offsetContext = offsetContext; OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(jdbcConnection); - this.connectorConfig = connectorConfig; - -// this.dmlParser = new OracleDmlParser(true, connectorConfig.getDatabaseName(), connectorConfig.getSchemaName(), -// converters); this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName(); this.dmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converters); this.transactionalBufferMetrics = new TransactionalBufferMetrics(taskContext); this.transactionalBufferMetrics.register(LOGGER); transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler, transactionalBufferMetrics); - this.logMinerMetrics = new LogMinerMetrics(taskContext); this.logMinerMetrics.register(LOGGER); this.strategy = connectorConfig.getLogMiningStrategy(); @@ -176,13 +169,10 @@ public void execute(ChangeEventSourceContext context) { processor.processResult(res); updateStartScn(); - - // get largest scn from the last uncommitted transaction and set as last processed scn LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn()); // update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer if (transactionalBuffer.isEmpty()) { - // When the buffer is empty, move mining boundaries forward offsetContext.setScn(startScn); transactionalBuffer.resetLargestScn(null); } @@ -191,17 +181,16 @@ public void execute(ChangeEventSourceContext context) { // we don't do it for other modes to save time on building data dictionary // if (strategy == OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) { // LogMinerHelper.endMining(connection); -// LogMinerHelper.setRedoLogFilesForMining(connection, startScn); // LogMinerHelper.updateLogMinerMetrics(connection, logMinerMetrics); // currentRedoLogFiles = LogMinerHelper.getCurrentRedoLogFiles(connection, logMinerMetrics); // } } } catch (Throwable e) { if (connectionProblem(e)) { - LOGGER.warn("Disconnection occurred. {} ", e.toString()); + LogMinerHelper.logWarn(transactionalBufferMetrics, "Disconnection occurred. {} ", e.toString()); continue; } - LOGGER.error("Mining session was stopped due to the {} ", e.toString()); + LogMinerHelper.logError(transactionalBufferMetrics, "Mining session was stopped due to the {} ", e.toString()); throw new RuntimeException(e); } finally { LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn()); @@ -215,7 +204,7 @@ public void execute(ChangeEventSourceContext context) { private void abandonOldTransactionsIfExist(Connection connection) throws SQLException { Optional lastScnToAbandonTransactions = LogMinerHelper.getLastScnFromTheOldestOnlineRedo(connection, offsetContext.getScn()); lastScnToAbandonTransactions.ifPresent(thresholdScn -> { - LOGGER.debug("All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn()); + LogMinerHelper.logWarn(transactionalBufferMetrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn()); transactionalBuffer.abandonLongTransactions(thresholdScn); offsetContext.setScn(thresholdScn); updateStartScn(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java index 8da4e2b47..da91a817f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java @@ -43,61 +43,61 @@ public class RowMapper { private static final int TABLE_NAME = 7; private static final int SEG_OWNER = 8; - public static int getOperationCode(ResultSet rs) { + public static int getOperationCode(TransactionalBufferMetrics metrics, ResultSet rs) { try { return rs.getInt(OPERATION_CODE); } catch (SQLException e) { - logError(e, "OPERATION_CODE"); + logError(metrics, e, "OPERATION_CODE"); return 0; } } - public static String getTableName(ResultSet rs) { + public static String getTableName(TransactionalBufferMetrics metrics, ResultSet rs) { try { return rs.getString(TABLE_NAME); } catch (SQLException e) { - logError(e, "TABLE_NAME"); + logError(metrics, e, "TABLE_NAME"); return ""; } } - public static String getSegOwner(ResultSet rs) { + public static String getSegOwner(TransactionalBufferMetrics metrics, ResultSet rs) { try { return rs.getString(SEG_OWNER); } catch (SQLException e) { - logError(e, "SEG_OWNER"); + logError(metrics, e, "SEG_OWNER"); return ""; } } - public static Timestamp getChangeTime(ResultSet rs) { + public static Timestamp getChangeTime(TransactionalBufferMetrics metrics, ResultSet rs) { try { return rs.getTimestamp(CHANGE_TIME); } catch (SQLException e) { - logError(e, "CHANGE_TIME"); + logError(metrics, e, "CHANGE_TIME"); return new Timestamp(Instant.now().getEpochSecond()); } } - public static BigDecimal getScn(ResultSet rs) { + public static BigDecimal getScn(TransactionalBufferMetrics metrics, ResultSet rs) { try { return rs.getBigDecimal(SCN); } catch (SQLException e) { - logError(e, "SCN"); + logError(metrics, e, "SCN"); return new BigDecimal(-1); } } - public static String getTransactionId(ResultSet rs) { + public static String getTransactionId(TransactionalBufferMetrics metrics, ResultSet rs) { try { return DatatypeConverter.printHexBinary(rs.getBytes(TX_ID)); } catch (SQLException e) { - logError(e, "TX_ID"); + logError(metrics, e, "TX_ID"); return ""; } } - public static String getSqlRedo(ResultSet rs) { + public static String getSqlRedo(TransactionalBufferMetrics metrics, ResultSet rs) { StringBuilder result = new StringBuilder(); try { int csf = rs.getInt(CSF); @@ -109,7 +109,7 @@ public static String getSqlRedo(ResultSet rs) { } else { result = new StringBuilder(rs.getString(SQL_REDO)); int lobLimit = 40000; // todo : decide on approach ( XStream chunk option) and Lob limit - BigDecimal scn = getScn(rs); + BigDecimal scn = getScn(metrics, rs); while (csf == 1) { rs.next(); if (lobLimit-- == 0) { @@ -121,13 +121,13 @@ public static String getSqlRedo(ResultSet rs) { } } } catch (SQLException e) { - logError(e, "SQL_REDO"); + logError(metrics, e, "SQL_REDO"); } return result.toString(); } - static void logError(SQLException e, String s) { - LOGGER.error("Cannot get {}. This entry from log miner will be lost due to the {}", s, e); + private static void logError(TransactionalBufferMetrics metrics, SQLException e, String s) { + LogMinerHelper.logError(metrics, "Cannot get {}. This entry from log miner will be lost due to the {}", s, e); } public static TableId getTableId(String catalogName, ResultSet rs) throws SQLException { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java index 481cb1843..e623e69c2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java @@ -25,8 +25,7 @@ class SqlUtils { static final String BUILD_DICTIONARY = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;"; static final String CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE"; - static final String CURRENT_MILLIS = "select TO_CHAR(extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 " + - " + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3'))) from dual"; + static final String CURRENT_TIMESTAMP = "select current_timestamp from dual"; static final String END_LOGMNR = "BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;"; static final String OLDEST_FIRST_CHANGE = "SELECT MIN(FIRST_CHANGE#) FROM V$LOG"; static final String ALL_ONLINE_LOGS = "SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP# " + @@ -120,16 +119,6 @@ static String queryLogMinerContents(String schemaName, String logMinerUser, Orac " OR (OPERATION_CODE IN (7,34,36) AND USERNAME NOT IN ('SYS','SYSTEM','"+logMinerUser.toUpperCase()+"'))" + sorting; //todo username = schemaName? } - /** - * After mining archived log files, we should remove them from the analysis. - * NOTE. It does not physically remove the log file. - * @param logFileName file ro remove - * @return statement - */ - static String getRemoveLogFileFromMiningStatement(String logFileName) { - return "BEGIN SYS.DBMS_LOGMNR.REMOVE_LOGFILE('" + logFileName + "'); END;"; - } - static String getAddLogFileStatement(String option, String fileName) { return "BEGIN sys." + "dbms_logmnr.add_logfile(" + diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index b98ce236a..3d469825a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -47,7 +46,7 @@ public final class TransactionalBuffer { private final ExecutorService executor; private final AtomicInteger taskCounter; private final ErrorHandler errorHandler; - private Optional metrics; + private TransactionalBufferMetrics metrics; private final Set abandonedTransactionIds; // storing rolledBackTransactionIds is for debugging purposes to check what was rolled back to research, todo delete in future releases @@ -62,19 +61,15 @@ public final class TransactionalBuffer { * Constructor to create a new instance. * * @param logicalName logical name - * @param errorHandler error handler - * @param metrics metrics MBean exposed + * @param errorHandler logError handler + * @param metrics metrics MBean */ TransactionalBuffer(String logicalName, ErrorHandler errorHandler, TransactionalBufferMetrics metrics) { this.transactions = new HashMap<>(); this.executor = Threads.newSingleThreadExecutor(OracleConnector.class, logicalName, "transactional-buffer"); this.taskCounter = new AtomicInteger(); this.errorHandler = errorHandler; - if (metrics != null) { - this.metrics = Optional.of(metrics); - } else { - this.metrics = Optional.empty(); - } + this.metrics = metrics; largestScn = BigDecimal.ZERO; lastCommittedScn = BigDecimal.ZERO; this.abandonedTransactionIds = new HashSet<>(); @@ -117,15 +112,15 @@ void resetLargestScn(Long value) { */ void registerCommitCallback(String transactionId, BigDecimal scn, Instant changeTime, String redoSql, CommitCallback callback) { if (abandonedTransactionIds.contains(transactionId)) { - LOGGER.warn("Another DML for an abandoned transaction {} : {}, ignored", transactionId, redoSql); + LogMinerHelper.logWarn(metrics, "Another DML for an abandoned transaction {} : {}, ignored", transactionId, redoSql); return; } transactions.computeIfAbsent(transactionId, s -> new Transaction(scn)); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); - metrics.ifPresent(TransactionalBufferMetrics::incrementCapturedDmlCounter); - metrics.ifPresent(m -> m.setLagFromTheSource(changeTime)); + metrics.setActiveTransactions(transactions.size()); + metrics.incrementCapturedDmlCounter(); + metrics.calculateLagMetrics(changeTime); // The transaction object is not a lightweight object anymore having all REDO_SQL stored. Transaction transaction = transactions.get(transactionId); @@ -133,7 +128,7 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change // todo this should never happen, delete when tested and confirmed if (rolledBackTransactionIds.contains(transactionId)) { - LOGGER.warn("Ignore DML for rolled back transaction: SCN={}, REDO_SQL={}", scn, redoSql); + LogMinerHelper.logWarn(metrics, "Ignore DML for rolled back transaction: SCN={}, REDO_SQL={}", scn, redoSql); return; } @@ -173,9 +168,9 @@ boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetC // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. // Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn() > scn.longValue()) || lastCommittedScn.longValue() > scn.longValue()) { - LOGGER.warn("Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", + LogMinerHelper.logWarn(metrics, "Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); + metrics.setActiveTransactions(transactions.size()); return false; } @@ -192,12 +187,12 @@ boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetC } lastCommittedScn = new BigDecimal(scn.longValue()); - metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); - metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size())); - metrics.ifPresent(m -> m.setCommittedScn(scn.longValue())); + metrics.incrementCommittedTransactions(); + metrics.setActiveTransactions(transactions.size()); + metrics.incrementCommittedDmlCounter(commitCallbacks.size()); + metrics.setCommittedScn(scn.longValue()); } catch (InterruptedException e) { - LOGGER.error("Thread interrupted during running", e); + LogMinerHelper.logError(metrics, "Thread interrupted during running", e); Thread.currentThread().interrupt(); } catch (Exception e) { errorHandler.setProducerThrowable(e); @@ -228,9 +223,9 @@ boolean rollback(String transactionId, String debugMessage) { abandonedTransactionIds.remove(transactionId); rolledBackTransactionIds.add(transactionId); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); - metrics.ifPresent(TransactionalBufferMetrics::incrementRolledBackTransactions); - metrics.ifPresent(m -> m.addRolledBackTransactionId(transactionId)); + metrics.setActiveTransactions(transactions.size()); + metrics.incrementRolledBackTransactions(); + metrics.addRolledBackTransactionId(transactionId); // todo decide if we need both metrics return true; } @@ -262,13 +257,13 @@ void abandonLongTransactions(Long thresholdScn) { while (iter.hasNext()) { Map.Entry transaction = iter.next(); if (transaction.getValue().firstScn.compareTo(threshold) <= 0) { - LOGGER.warn("Following long running transaction {} will be abandoned and ignored: {} ", transaction.getKey(), transaction.getValue().toString()); + LogMinerHelper.logWarn(metrics, "Following long running transaction {} will be abandoned and ignored: {} ", transaction.getKey(), transaction.getValue().toString()); abandonedTransactionIds.add(transaction.getKey()); iter.remove(); calculateLargestScn(); - metrics.ifPresent(t -> t.addAbandonedTransactionId(transaction.getKey())); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); + metrics.addAbandonedTransactionId(transaction.getKey()); + metrics.setActiveTransactions(transactions.size()); } } } @@ -279,7 +274,7 @@ private BigDecimal calculateSmallestScn() { .map(transaction -> transaction.firstScn) .min(BigDecimal::compareTo) .orElseThrow(() -> new DataException("Cannot calculate smallest SCN")); - metrics.ifPresent(m -> m.setOldestScn(scn == null ? -1 : scn.longValue())); + metrics.setOldestScn(scn == null ? -1 : scn.longValue()); return scn; } @@ -318,7 +313,7 @@ void close() { executor.shutdownNow(); } } catch (InterruptedException e) { - LOGGER.error("Thread interrupted during shutdown", e); + LogMinerHelper.logError(metrics, "Thread interrupted during shutdown", e); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java index 463c14f5d..5b5e773e9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java @@ -32,12 +32,15 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional private AtomicLong committedDmlCounter = new AtomicLong(); private AtomicReference maxLagFromTheSource = new AtomicReference<>(); private AtomicReference minLagFromTheSource = new AtomicReference<>(); - private AtomicReference totalLagsFromTheSource = new AtomicReference<>(); + private AtomicReference averageLagsFromTheSource = new AtomicReference<>(); private AtomicReference> abandonedTransactionIds = new AtomicReference<>(); private AtomicReference> rolledBackTransactionIds = new AtomicReference<>(); private Instant startTime; private static long MILLIS_PER_SECOND = 1000L; private AtomicLong timeDifference = new AtomicLong(); + private AtomicInteger errorCounter = new AtomicInteger(); + private AtomicInteger warningCounter = new AtomicInteger(); + private AtomicInteger scnFreezeCounter = new AtomicInteger(); @@ -46,6 +49,8 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional private Long ufvInsert = 0L; private Long wiDelete = 0L; private Long wiInsert = 0L; + private Long rtdDelete = 0L; + private Long rtdInsert = 0L; public Long getUfvDelete() { return ufvDelete; @@ -55,10 +60,6 @@ public void incrementUfvDelete() { this.ufvDelete++; } - public void decrementUfvDelete() { - this.ufvDelete--; - } - public Long getUfvInsert() { return ufvInsert; } @@ -66,10 +67,6 @@ public Long getUfvInsert() { public void incrementUfvInsert() { this.ufvInsert++; } - public void decrementUfvInsert() { - this.ufvInsert--; - } - @Override public Long getWiDelete() { return wiDelete; @@ -90,11 +87,32 @@ public void incrementWiInsert() { wiInsert++; } + @Override + public Long getRTDDelete() { + return rtdDelete; + } + + @Override + public void incrementRTDDelete() { + rtdDelete++; + } + + @Override + public Long getRTDInsert() { + return rtdInsert; + } + + @Override + public void incrementRTDInsert() { + rtdInsert++; + } + TransactionalBufferMetrics(CdcSourceTaskContext taskContext) { super(taskContext, "log-miner-transactional-buffer"); startTime = Instant.now(); oldestScn.set(-1); committedScn.set(-1); + timeDifference.set(0); reset(); } @@ -111,10 +129,10 @@ public void setTimeDifference(AtomicLong timeDifference) { this.timeDifference = timeDifference; } - void setLagFromTheSource(Instant changeTime){ + void calculateLagMetrics(Instant changeTime){ if (changeTime != null) { -// lagFromTheSource.set(Duration.between(Instant.now(), changeTime.minus(Duration.ofMillis(timeDifference.longValue()))).abs()); - lagFromTheSource.set(Duration.between(Instant.now(), changeTime).abs()); + Instant correctedChangeTime = changeTime.plus(Duration.ofMillis(timeDifference.longValue())); + lagFromTheSource.set(Duration.between(correctedChangeTime, Instant.now())); if (maxLagFromTheSource.get().toMillis() < lagFromTheSource.get().toMillis()) { maxLagFromTheSource.set(lagFromTheSource.get()); @@ -122,7 +140,12 @@ void setLagFromTheSource(Instant changeTime){ if (minLagFromTheSource.get().toMillis() > lagFromTheSource.get().toMillis()) { minLagFromTheSource.set(lagFromTheSource.get()); } - totalLagsFromTheSource.set(totalLagsFromTheSource.get().plus(lagFromTheSource.get())); + + if (averageLagsFromTheSource.get().isZero()) { + averageLagsFromTheSource.set(lagFromTheSource.get()); + } else { + averageLagsFromTheSource.set(averageLagsFromTheSource.get().plus(lagFromTheSource.get()).dividedBy(2)); + } } } @@ -160,6 +183,30 @@ void addRolledBackTransactionId(String transactionId){ } } + /** + * This is to increase logged logError counter. + * There are other ways to monitor the log, but this is just to check if there are any. + */ + void incrementErrorCounter() { + errorCounter.incrementAndGet(); + } + + /** + * This is to increase logged warning counter + * There are other ways to monitor the log, but this is just to check if there are any. + */ + void incrementWarningCounter() { + warningCounter.incrementAndGet(); + } + + /** + * This counter to accumulate number of encountered observations when SCN does not change in the offset. + * This call indicates an uncommitted oldest transaction in the buffer. + */ + void incrementScnFreezeCounter() { + scnFreezeCounter.incrementAndGet(); + } + // implemented getters @Override public Long getOldestScn() { @@ -218,7 +265,7 @@ public long getMinLagFromSource() { @Override public long getAverageLagFromSource() { - return totalLagsFromTheSource.get().toMillis()/(capturedDmlCounter.get() == 0 ? 1 : capturedDmlCounter.get()); + return averageLagsFromTheSource.get().toMillis(); } @Override @@ -231,24 +278,43 @@ public Set getRolledBackTransactionIds() { return rolledBackTransactionIds.get(); } + @Override + public int getErrorCounter() { + return errorCounter.get(); + } + + @Override + public int getWarningCounter() { + return warningCounter.get(); + } + + @Override + public int getScnFreezeCounter() { + return scnFreezeCounter.get(); + } + @Override public void reset() { maxLagFromTheSource.set(Duration.ZERO); minLagFromTheSource.set(Duration.ZERO); - totalLagsFromTheSource.set(Duration.ZERO); + averageLagsFromTheSource.set(Duration.ZERO); activeTransactions.set(0); rolledBackTransactions.set(0); committedTransactions.set(0); capturedDmlCounter.set(0); committedDmlCounter.set(0); - totalLagsFromTheSource.set(Duration.ZERO); abandonedTransactionIds.set(new HashSet<>()); rolledBackTransactionIds.set(new HashSet<>()); lagFromTheSource.set(Duration.ZERO); + errorCounter.set(0); + warningCounter.set(0); + scnFreezeCounter.set(0); ufvDelete = 0L; ufvInsert = 0L; wiInsert = 0L; wiDelete = 0L; + rtdInsert = 0L; + rtdDelete = 0L; } @Override @@ -264,8 +330,11 @@ public String toString() { ", committedDmlCounter=" + committedDmlCounter.get() + ", maxLagFromTheSource=" + maxLagFromTheSource.get() + ", minLagFromTheSource=" + minLagFromTheSource.get() + - ", averageLagsFromTheSource=" + getAverageLagFromSource() + + ", averageLagsFromTheSource=" + averageLagsFromTheSource.get() + ", abandonedTransactionIds=" + abandonedTransactionIds.get() + + ", errorCounter=" + errorCounter.get() + + ", warningCounter=" + warningCounter.get() + + ", scnFreezeCounter=" + scnFreezeCounter.get() + '}'; } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java index 13a465571..82a16c71b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java @@ -41,7 +41,7 @@ public interface TransactionalBufferMetricsMXBean { long getCapturedDmlThroughput(); /** - * exposes tatal number of captured DMLs + * exposes total number of captured DMLs * @return captured DML count */ long getCapturedDmlCount(); @@ -108,6 +108,22 @@ public interface TransactionalBufferMetricsMXBean { */ void reset(); + /** + * This is to get logged logError counter. + */ + int getErrorCounter(); + + /** + * This is to get logged warning counter + */ + int getWarningCounter(); + + /** + * Get counter of encountered observations when SCN does not change in the offset. + */ + int getScnFreezeCounter(); + + // todo delete after stowplan test Long getUfvDelete(); void incrementUfvDelete(); @@ -118,4 +134,9 @@ public interface TransactionalBufferMetricsMXBean { void incrementWiDelete(); Long getWiInsert(); void incrementWiInsert(); + + Long getRTDDelete(); + void incrementRTDDelete(); + Long getRTDInsert(); + void incrementRTDInsert(); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/LogMinerOracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/LogMinerOracleConnectorIT.java index 4d72968c4..9d0089103 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/LogMinerOracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/LogMinerOracleConnectorIT.java @@ -10,7 +10,12 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; @@ -31,6 +36,22 @@ public static void beforeSuperClass() throws SQLException { OracleConnectorIT.beforeClass(); } + @Test + public void shouldTakeTimeDifference() throws Exception { + String stmt = "select current_timestamp from dual"; + try (Connection conn = connection.connection(true); + PreparedStatement ps = conn.prepareStatement(stmt); + ResultSet rs = ps.executeQuery() + ) { + rs.next(); + java.sql.Timestamp ts = rs.getTimestamp(1); + Instant fromDb = ts.toInstant(); + Instant now = Instant.now(); + long diff = Duration.between(fromDb, now).toMillis(); + System.out.println("diff:" + diff); + } + } + @Test public void shouldTakeSnapshot() throws Exception { super.shouldTakeSnapshot(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerMetricsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerMetricsTest.java new file mode 100644 index 000000000..4debf38b8 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerMetricsTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import io.debezium.connector.common.CdcSourceTaskContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicLong; + +import static org.fest.assertions.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public class LogMinerMetricsTest { + + private LogMinerMetrics metrics; + + @Before + public void before() { + CdcSourceTaskContext taskContext = mock(CdcSourceTaskContext.class); + Mockito.when(taskContext.getConnectorName()).thenReturn("connector name"); + Mockito.when(taskContext.getConnectorType()).thenReturn("connector type"); + metrics = new LogMinerMetrics(taskContext); + } + + @Test + public void testMetrics() { + + metrics.incrementCapturedDmlCount(); + assertThat(metrics.getCapturedDmlCount() == 1).isTrue(); + + metrics.setCurrentScn(1000L); + assertThat(metrics.getCurrentScn() == 1000L).isTrue(); + + metrics.setBatchSize(10); + assertThat(metrics.getBatchSize() == 5_000).isTrue(); + metrics.setBatchSize(1_000_000); + assertThat(metrics.getBatchSize() == 5_000).isTrue(); + metrics.setBatchSize(6000); + assertThat(metrics.getBatchSize() == 6_000).isTrue(); + + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 1000).isTrue(); + metrics.changeSleepingTime(true); + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 1200).isTrue(); + metrics.changeSleepingTime(false); + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 1000).isTrue(); + metrics.setMillisecondToSleepBetweenMiningQuery(20); + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 1000).isTrue(); + metrics.setMillisecondToSleepBetweenMiningQuery(4000); + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 1000).isTrue(); + metrics.setMillisecondToSleepBetweenMiningQuery(2000); + assertThat(metrics.getMillisecondToSleepBetweenMiningQuery() == 2000).isTrue(); + + metrics.setLastLogMinerQueryDuration(Duration.ofMillis(100)); + assertThat(metrics.getLastLogMinerQueryDuration() == 100).isTrue(); + metrics.setLastLogMinerQueryDuration(Duration.ofMillis(200)); + assertThat(metrics.getLastLogMinerQueryDuration() == 200).isTrue(); + assertThat(metrics.getAverageLogMinerQueryDuration() == 150).isTrue(); + assertThat(metrics.getLogMinerQueryCount() == 2).isTrue(); + + metrics.setCurrentLogFileName(new HashSet<>(Arrays.asList("name","name1"))); + assertThat(metrics.getCurrentRedoLogFileName()[0].equals("name")).isTrue(); + assertThat(metrics.getCurrentRedoLogFileName()[1].equals("name1")).isTrue(); + + metrics.setSwitchCount(5); + assertThat(metrics.getSwitchCounter() == 5).isTrue(); + + metrics.setProcessedCapturedBatchDuration(Duration.ofMillis(1000)); + assertThat(metrics.getLastProcessedCapturedBatchDuration() == 1000).isTrue(); + assertThat(metrics.getProcessedCapturedBatchCount() == 1).isTrue(); + assertThat(metrics.getAverageProcessedCapturedBatchDuration() == 1000).isTrue(); + + metrics.setRedoLogStatus(Collections.singletonMap("name", "current")); + assertThat(metrics.getRedoLogStatus()[0].equals("name | current")).isTrue(); + + assertThat(metrics.toString().contains("logMinerQueryCount")); + } + +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java new file mode 100644 index 000000000..856db102d --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java @@ -0,0 +1,138 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import io.debezium.connector.common.CdcSourceTaskContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicLong; + +import static org.fest.assertions.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public class TransactionalBufferMetricsTest { + + private TransactionalBufferMetrics metrics; + + @Before + public void before() { + CdcSourceTaskContext taskContext = mock(CdcSourceTaskContext.class); + Mockito.when(taskContext.getConnectorName()).thenReturn("connector name"); + Mockito.when(taskContext.getConnectorType()).thenReturn("connector type"); + metrics = new TransactionalBufferMetrics(taskContext); + } + + @After + public void after() { + metrics.reset(); + } + + @Test + public void testLagMetrics() { + // no time difference between connector and database + long lag = metrics.getLagFromSource(); + assertThat(lag == 0).isTrue(); + Instant dbEventTime = Instant.now().minusMillis(2000); + metrics.calculateLagMetrics(dbEventTime); + lag = metrics.getLagFromSource(); + assertThat(lag == 2000).isTrue(); + assertThat(metrics.getMaxLagFromSource() == 2000).isTrue(); + assertThat(metrics.getMinLagFromSource() == 0).isTrue(); + assertThat(metrics.getAverageLagFromSource() == 2000).isTrue(); + + // not realistic scenario + dbEventTime = Instant.now().plusMillis(2000); + metrics.calculateLagMetrics(dbEventTime); + lag = metrics.getLagFromSource(); + assertThat(lag == -2000).isTrue(); + assertThat(metrics.getMaxLagFromSource() == 2000).isTrue(); + assertThat(metrics.getMinLagFromSource() == -2000).isTrue(); + assertThat(metrics.getAverageLagFromSource() == 0).isTrue(); + + metrics.reset(); + + // ########################## + // the database time is ahead + metrics.setTimeDifference(new AtomicLong(-1000)); + dbEventTime = Instant.now().minusMillis(2000); + metrics.calculateLagMetrics(dbEventTime); + lag = metrics.getLagFromSource(); + assertThat(lag == 3000).isTrue(); + assertThat(metrics.getMaxLagFromSource() == 3000).isTrue(); + assertThat(metrics.getMinLagFromSource() == 0).isTrue(); + assertThat(metrics.getAverageLagFromSource() == 3000).isTrue(); + + dbEventTime = Instant.now().minusMillis(3000); + metrics.calculateLagMetrics(dbEventTime); + lag = metrics.getLagFromSource(); + assertThat(lag == 4000).isTrue(); + assertThat(metrics.getMaxLagFromSource() == 4000).isTrue(); + assertThat(metrics.getMinLagFromSource() == 0).isTrue(); + assertThat(metrics.getAverageLagFromSource() == 3500).isTrue(); + + metrics.reset(); + + // ########################## + // the database time is behind + metrics.setTimeDifference(new AtomicLong(1000)); + dbEventTime = Instant.now().minusMillis(2000); + metrics.calculateLagMetrics(dbEventTime); + lag = metrics.getLagFromSource(); + assertThat(lag == 1000).isTrue(); + assertThat(metrics.getMaxLagFromSource() == 1000).isTrue(); + assertThat(metrics.getMinLagFromSource() == 0).isTrue(); + assertThat(metrics.getAverageLagFromSource() == 1000).isTrue(); + } + + @Test + public void testOthers() { + metrics.incrementScnFreezeCounter(); + assertThat(metrics.getScnFreezeCounter() == 1).isTrue(); + + metrics.incrementErrorCounter(); + assertThat(metrics.getErrorCounter() == 1).isTrue(); + + metrics.incrementWarningCounter(); + assertThat(metrics.getWarningCounter() == 1).isTrue(); + + metrics.incrementCommittedDmlCounter(5_000); + for (int i=0; i<1000; i++){ + metrics.incrementCapturedDmlCounter(); + metrics.incrementCommittedTransactions(); + } + assertThat(metrics.getCapturedDmlCount() == 1000).isTrue(); + assertThat(metrics.getCapturedDmlThroughput() > 10_000 && metrics.getCapturedDmlThroughput() < 1_000_000).isTrue(); + assertThat(metrics.getNumberOfCommittedTransactions() == 1000).isTrue(); + assertThat(metrics.getCommitThroughput() >= 1_000 && metrics.getCommitThroughput() <= 200_000).isTrue(); + + metrics.incrementRolledBackTransactions(); + assertThat(metrics.getNumberOfRolledBackTransactions() == 1).isTrue(); + + metrics.setActiveTransactions(5); + assertThat(metrics.getNumberOfActiveTransactions() == 5).isTrue(); + + metrics.addRolledBackTransactionId("rolledback id"); + assertThat(metrics.getNumberOfRolledBackTransactions() == 1).isTrue(); + assertThat(metrics.getRolledBackTransactionIds().contains("rolledback id")).isTrue(); + + metrics.addAbandonedTransactionId("abandoned id"); + assertThat(metrics.getAbandonedTransactionIds().size() == 1).isTrue(); + assertThat(metrics.getAbandonedTransactionIds().contains("abandoned id")).isTrue(); + + metrics.setOldestScn(10L); + assertThat(metrics.getOldestScn() == 10L).isTrue(); + + metrics.setCommittedScn(10L); + assertThat(metrics.getCommittedScn() == 10L).isTrue(); + + assertThat(metrics.toString().contains("capturedDmlCounter=1000")).isTrue(); + + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 205eb4c46..270ee53f8 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -15,6 +15,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.mockito.Mockito.mock; import java.math.BigDecimal; import java.sql.Timestamp; @@ -70,7 +71,8 @@ public void before() { .maxQueueSize(DEFAULT_MAX_QUEUE_SIZE) .build(); errorHandler = new ErrorHandler(OracleConnector.class, SERVER_NAME, queue, () -> { }); - transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, null); + TransactionalBufferMetrics metrics = mock(TransactionalBufferMetrics.class); + transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, metrics); } @After diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index f4d9a6b77..ac684f43d 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -19,20 +19,25 @@ public class TestHelper { - //private static final String HOST = "10.47.100.32"; //Roby +// private static final String HOST = "10.47.100.32"; //Roby private static final String HOST = "10.47.100.62"; //Development + private static final String SCHEMA_USER = "debezium"; + private static final String SCHEMA_PASS = "dbz"; + private static final String DATABASE = "ORA19C_PDB01"; //dev +// private static final String DATABASE = "ORCLPDB";//qa public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath(); - //public static final String CONNECTOR_USER = "c##xstrm"; - public static final String CONNECTOR_USER = "c##logminer"; +// public static final String CONNECTOR_USER = "c##xstrm";// qa + public static final String CONNECTOR_USER = "c##logminer"; // dev +// public static final String CONNECTOR_USER_PASS = "xs"; //qa + public static final String CONNECTOR_USER_PASS = "lm"; //dev public static JdbcConfiguration defaultJdbcConfig() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) .withDefault(JdbcConfiguration.HOSTNAME, HOST) .withDefault(JdbcConfiguration.PORT, 1521) .withDefault(JdbcConfiguration.USER, CONNECTOR_USER) - //.withDefault(JdbcConfiguration.PASSWORD, "xs") //Roby todo, revert - .withDefault(JdbcConfiguration.PASSWORD, "lm") //development + .withDefault(JdbcConfiguration.PASSWORD, CONNECTOR_USER_PASS) .withDefault(JdbcConfiguration.DATABASE, "ORA19C") .build(); } @@ -50,10 +55,10 @@ public static Configuration.Builder defaultConfig() { ); return builder.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "server1") - .with(OracleConnectorConfig.PDB_NAME, "ORA19C_PDB01") + .with(OracleConnectorConfig.PDB_NAME, DATABASE) .with(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout") .with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) - .with(OracleConnectorConfig.SCHEMA_NAME, "DEBEZIUM") + .with(OracleConnectorConfig.SCHEMA_NAME, SCHEMA_USER) .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH); } @@ -92,9 +97,9 @@ private static JdbcConfiguration testJdbcConfig() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) .withDefault(JdbcConfiguration.HOSTNAME, HOST) .withDefault(JdbcConfiguration.PORT, 1521) - .withDefault(JdbcConfiguration.USER, "debezium") - .withDefault(JdbcConfiguration.PASSWORD, "dbz") - .withDefault(JdbcConfiguration.DATABASE, "ORA19C_PDB01") + .withDefault(JdbcConfiguration.USER, SCHEMA_USER) + .withDefault(JdbcConfiguration.PASSWORD, SCHEMA_PASS) + .withDefault(JdbcConfiguration.DATABASE, DATABASE) .build(); } @@ -107,7 +112,7 @@ private static JdbcConfiguration adminJdbcConfig() { .withDefault(JdbcConfiguration.PORT, 1521) .withDefault(JdbcConfiguration.USER, "sys as sysdba") .withDefault(JdbcConfiguration.PASSWORD, "top_secret") - .withDefault(JdbcConfiguration.DATABASE, "ORA19C_PDB01") + .withDefault(JdbcConfiguration.DATABASE, DATABASE) .build(); }