From 4880c244d1a9a53fd4368d07ad5a9abd58cb8ab1 Mon Sep 17 00:00:00 2001 From: Ignatenko Andrey Date: Tue, 24 Mar 2020 14:57:08 -0700 Subject: [PATCH] Merge pull request #42 in N4FRA/debezium from DSCON-175_persist_transactional_buffer to master Squashed commit of the following: commit 374c32f8a8a11af620605ff93affe57c5a661eb9 Merge: a6fd82c9 43ce5a66 Author: AndreyIg Date: Tue Mar 24 14:53:56 2020 -0700 Merge branch 'master' into DSCON-175_persist_transactional_buffer commit 43ce5a66e46aaaf02957a28f10193a64051d38f3 Merge: 89feb149 ed0579ec Author: AndreyIg Date: Tue Mar 24 14:53:19 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit a6fd82c9e1eb6ac4a60122103be8e07cde3401cf Author: AndreyIg Date: Tue Mar 24 14:48:41 2020 -0700 DSCON-175, persist transactional buffer commit 88a7202de2c2035c9ca4e81ce7f91c3bb3010aa9 Merge: 41151847 89feb149 Author: AndreyIg Date: Wed Mar 18 15:29:10 2020 -0700 Merge branch 'master' into DSCON-125_column_blacklist_Oracle commit 89feb149abcecfb539f78c4d8aa1ed9de9132f10 Merge: 50d11dda 6c113639 Author: AndreyIg Date: Wed Mar 18 15:25:50 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 50d11ddafee3cc605bad0740caa73775ee2bff4b Merge: 40dd12fa 9191658f Author: AndreyIg Date: Mon Mar 16 17:20:37 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 40dd12fa70fd3ef507ace5a52f41c8b48c4a9433 Merge: 0536eab2 2075352c Author: AndreyIg Date: Mon Mar 16 17:15:12 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 0536eab2fc68d097f4d355618acba05ea6dd0c07 Merge: 76f9f808 e30cfbd2 Author: AndreyIg Date: Thu Mar 12 13:31:51 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 76f9f8087d8249f224241df8b24c0285be238daf Merge: 77e567e5 af6f8a3b Author: AndreyIg Date: Wed Mar 11 12:33:09 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 77e567e595f0c5307127610c8b29c8697cc80878 Merge: 8e3d9224 0585b2b7 Author: AndreyIg Date: Mon Mar 9 12:07:58 2020 -0700 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 8e3d92245229ca553969d069b803ce90d96a8e30 Merge: a98bb754 d4bc5286 Author: AndreyIg Date: Sat Mar 7 04:26:18 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 41151847286d2e0c1b6979dbde1420f0221a37cc Author: AndreyIg Date: Sat Mar 7 04:18:39 2020 -0800 DSCON-125, make column blacklist working for Oracle commit badff6d5dec76671afd4d9bd5568f1155a58cbb6 Merge: 2db5385f a98bb754 Author: AndreyIg Date: Sat Mar 7 04:18:19 2020 -0800 Merge branch 'master' into DSCON-125_column_blacklist_Oracle commit a98bb75401a6902b0576ec68c1247c5a7355af68 Merge: c78c3685 a23eb5a5 Author: AndreyIg Date: Sat Mar 7 04:12:31 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 2db5385f4bb688976a77b7ef6f6f8a401515216d Author: AndreyIg Date: Fri Mar 6 18:01:17 2020 -0800 DSCON-125, make column blacklist working for Oracle commit c78c36858d45b91eee837cf4189379c198667e47 Merge: 90bcc19c 4619fcd0 Author: AndreyIg Date: Fri Mar 6 06:52:42 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 90bcc19c13467ab8cc017c11140a646c39049064 Merge: b5d1ea79 3e3aeea3 Author: AndreyIg Date: Mon Mar 2 14:31:07 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit b5d1ea790cd729289ac36d1b2c7a42ae8fb1a3ec Merge: 96860411 51f0dcb4 Author: AndreyIg Date: Wed Feb 26 17:17:38 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 96860411512d809463f0e884db25133583e1ef26 Merge: 926c648d 4996a49a Author: AndreyIg Date: Wed Feb 26 12:02:35 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium commit 926c648dc23b5ecb90134e37b2d97c117fd543b2 Merge: 92140a3c 829206cb Author: AndreyIg Date: Wed Feb 26 10:49:29 2020 -0800 Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium ... and 23 more commits --- .../connector/oracle/OracleOffsetContext.java | 23 ++++- .../OracleSnapshotChangeEventSource.java | 1 + .../oracle/OracleSourceInfoStructMaker.java | 4 + .../debezium/connector/oracle/SourceInfo.java | 10 ++ .../oracle/jsqlparser/SimpleDmlParser.java | 2 +- .../LogMinerQueryResultProcessor.java | 26 +++-- .../oracle/logminer/TransactionalBuffer.java | 51 +++++----- .../logminer/TransactionalBufferMetrics.java | 33 +++++++ .../TransactionalBufferMetricsMXBean.java | 12 +++ .../connector/oracle/SourceInfoTest.java | 1 + .../logminer/TransactionalBufferTest.java | 95 ++++++++++++------- 11 files changed, 191 insertions(+), 67 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index edce7bb22..96d867c44 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -33,6 +33,11 @@ public class OracleOffsetContext implements OffsetContext { */ private boolean snapshotCompleted; + public OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Long commitScn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) { + this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted); + sourceInfo.setCommitScn(commitScn); + } + private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); @@ -112,7 +117,12 @@ public static Builder create() { if (sourceInfo.getLcrPosition() != null) { return Collections.singletonMap(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } - return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn()); + Map offset = new HashMap<>(); + + offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + + return offset; } } @@ -130,10 +140,18 @@ public void setScn(long scn) { sourceInfo.setScn(scn); } + public void setCommitScn(Long commitScn) { + sourceInfo.setCommitScn(commitScn); + } + public long getScn() { return sourceInfo.getScn(); } + public Long getCommitScn() { + return sourceInfo.getCommitScn(); + } + public void setLcrPosition(LcrPosition lcrPosition) { sourceInfo.setLcrPosition(lcrPosition); } @@ -216,7 +234,8 @@ public OffsetContext load(Map offset) { Long scn; if (adapter == OracleConnectorConfig.ConnectorAdapter.LOG_MINER){ scn = (Long) offset.get(SourceInfo.SCN_KEY); - return new OracleOffsetContext(connectorConfig, scn, null, snapshot, snapshotCompleted); + Long commitScn = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY); + return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted); } else { LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY)); scn = lcrPosition != null ? lcrPosition.getScn() : (Long) offset.get(SourceInfo.SCN_KEY); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index 6e33da844..4b1642031 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -276,6 +276,7 @@ public static String buildSelectColumns(String blackListColumnStr, Table table) } return sb.toString(); }).collect(Collectors.joining(",")); + // todo this is an unnecessary code, fix unit test, then remove it String catalog = table.id().catalog(); List blackList = new ArrayList<>(Arrays.asList(blackListColumnStr.trim().toUpperCase().replaceAll(catalog + ".", "").split(","))); List allColumns = new ArrayList<>(Arrays.asList(allTableColumns.toUpperCase().split(","))); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java index 5ece61a9c..1003aa7e5 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -23,6 +23,7 @@ public OracleSourceInfoStructMaker(String connector, String version, CommonConne .field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA) .field(SourceInfo.TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .field(SourceInfo.COMMIT_SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) .field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA) .build(); } @@ -43,6 +44,9 @@ public Struct struct(SourceInfo sourceInfo) { if (sourceInfo.getLcrPosition() != null) { ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } + if(sourceInfo.getCommitScn() != null) { + ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + } return ret; } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 171408302..b5ca522a6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -17,10 +17,12 @@ public class SourceInfo extends BaseSourceInfo { public static final String TXID_KEY = "txId"; public static final String SCN_KEY = "scn"; + public static final String COMMIT_SCN_KEY = "commit_scn"; public static final String LCR_POSITION_KEY = "lcr_position"; public static final String SNAPSHOT_KEY = "snapshot"; private long scn; + private Long commitScn; private LcrPosition lcrPosition; private String transactionId; private Instant sourceTime; @@ -34,10 +36,18 @@ public long getScn() { return scn; } + public Long getCommitScn() { + return commitScn; + } + public void setScn(long scn) { this.scn = scn; } + public void setCommitScn(Long commitScn) { + this.commitScn = commitScn; + } + public LcrPosition getLcrPosition() { return lcrPosition; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/jsqlparser/SimpleDmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/jsqlparser/SimpleDmlParser.java index 41ed120be..e88b1141e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/jsqlparser/SimpleDmlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/jsqlparser/SimpleDmlParser.java @@ -89,7 +89,7 @@ public SimpleDmlParser(String catalogName, String schemaName, OracleChangeRecord public void parse(String dmlContent, Tables tables, String txId){ try { if (dmlContent == null) { - LOGGER.error("Cannot parse NULL , transaction: {}", txId); + LOGGER.warn("Cannot parse NULL , transaction: {}", txId); rowLCR = null; return; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 9012593cf..debc2ed6b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -98,8 +98,8 @@ public int processResult(ResultSet resultSet) { Timestamp changeTime = RowMapper.getChangeTime(resultSet); String txId = RowMapper.getTransactionId(resultSet); - String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s", - txId, scn, tableName, segOwner, operationCode, offsetContext.getScn()); + String logMessage = String.format("transactionId = %s, SCN= %s, table_name= %s, segOwner= %s, operationCode=%s, offsetSCN= %s, " + + " commitOffsetSCN= %s", txId, scn, tableName, segOwner, operationCode, offsetContext.getScn(), offsetContext.getCommitScn()); if (scn == null) { LOGGER.warn("Scn is null for {}", logMessage); @@ -108,7 +108,7 @@ public int processResult(ResultSet resultSet) { // Commit if (operationCode == RowMapper.COMMIT) { - if (transactionalBuffer.commit(txId, changeTime, context, logMessage)){ + if (transactionalBuffer.commit(txId, scn, offsetContext, changeTime, context, logMessage)){ LOGGER.trace("COMMIT, {}", logMessage); commitCounter++; cumulativeCommitTime = cumulativeCommitTime.plus(Duration.between(iterationStart, Instant.now())); @@ -174,7 +174,17 @@ public int processResult(ResultSet resultSet) { try { TableId tableId = RowMapper.getTableId(catalogName, resultSet); - transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn) -> { + // todo delete after stowplan confirmation + if ("inv_unit_fcy_visit".equalsIgnoreCase(tableName)) { + if (operationCode == 1) { + transactionalBufferMetrics.incrementUfvInsert(); + } + if (operationCode == 2) { + transactionalBufferMetrics.incrementUfvDelete(); + } + } + + transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), redo_sql, (timestamp, smallestScn, commitScn, counter) -> { // update SCN in offset context only if processed SCN less than SCN among other transactions if (smallestScn == null || scn.compareTo(smallestScn) < 0) { offsetContext.setScn(scn.longValue()); @@ -183,6 +193,9 @@ public int processResult(ResultSet resultSet) { offsetContext.setTransactionId(txId); offsetContext.setSourceTime(timestamp.toInstant()); offsetContext.setTableId(tableId); + if (counter == 0){ + offsetContext.setCommitScn(commitScn.longValue()); + } Table table = schema.tableFor(tableId); LOGGER.trace("Processing DML event {} scn {}", rowLcr.toString(), scn); @@ -201,11 +214,12 @@ public int processResult(ResultSet resultSet) { metrics.setCapturedDmlCount(dmlCounter); if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) { LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks were processed in {} milliseconds, commit time:{}, rollback time: {}, parse time:{}, " + - "other time:{}, lag:{}, offset:{}", + "other time:{}, lag:{}, offset scn:{}, offset commit scn:{}, active transactions:{}", dmlCounter, commitCounter, rollbackCounter, (Duration.between(startTime, Instant.now()).toMillis()), cumulativeCommitTime.toMillis(), cumulativeRollbackTime.toMillis(), cumulativeParseTime.toMillis(), cumulativeOtherTime.toMillis(), - transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn()); + transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(), + offsetContext.getCommitScn(), transactionalBufferMetrics.getNumberOfActiveTransactions()); } return dmlCounter; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 74ec1514a..1607fd849 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -7,6 +7,7 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.connector.oracle.OracleConnector; +import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.util.Threads; @@ -144,14 +145,6 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change return; } -// BigDecimal previousScn = transaction.redoSqlMap.floorKey(scn); -// if (previousScn != null) { -// if (transaction.redoSqlMap.get(previousScn) != null && transaction.redoSqlMap.get(previousScn).contains(redoSql)) { -// LOGGER.debug("Ignored duplicated capture for the previous SCN={}, REDO_SQL={}", scn, redoSql); -// return; -// } -// } - transaction.commitCallbacks.add(callback); transaction.addRedoSql(scn, redoSql); } @@ -163,39 +156,53 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change /** * @param transactionId transaction identifier + * @param commitScn SCN of the commit + * @param offsetContext Oracle offset * @param timestamp commit timestamp * @param context context to check that source is running * @param debugMessage todo delete - * @return true if committed transaction is in the buffer + * @return true if committed transaction is in the buffer and was not processed already */ - boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.ChangeEventSourceContext context, String debugMessage) { + boolean commit(String transactionId, BigDecimal commitScn, OracleOffsetContext offsetContext, Timestamp timestamp, + ChangeEventSource.ChangeEventSourceContext context, String debugMessage) { Transaction transaction = transactions.get(transactionId); - if (transaction != null) { - transaction.lastScn = transaction.lastScn.add(BigDecimal.ONE); - calculateLargestScn(); + if (transaction == null) { + return false; } + transaction.lastScn = transaction.lastScn.add(BigDecimal.ONE); + calculateLargestScn(); + transaction = transactions.remove(transactionId); BigDecimal smallestScn = calculateSmallestScn(); - if (transaction == null) { - return false; - } taskCounter.incrementAndGet(); abandonedTransactionIds.remove(transactionId); + if (offsetContext.getCommitScn() != null && offsetContext.getCommitScn() >= commitScn.longValue()) { + LOGGER.info("Transaction {} was already processed. Committed SCN in offset is {}, commit SCN of the transaction is {}", + transactionId, offsetContext.getCommitScn(), commitScn); + metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); + return false; + } + List commitCallbacks = transaction.commitCallbacks; - LOGGER.trace("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn); + LOGGER.debug("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn); executor.execute(() -> { try { + int counter = commitCallbacks.size(); for (CommitCallback callback : commitCallbacks) { if (!context.isRunning()) { return; } - callback.execute(timestamp, smallestScn); - metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions); + callback.execute(timestamp, smallestScn, commitScn, --counter); } + + metrics.ifPresent(TransactionalBufferMetrics::incrementCommittedTransactions); + metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); + metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size())); + metrics.ifPresent(m -> m.setCommittedScn(commitScn.longValue())); } catch (InterruptedException e) { LOGGER.error("Thread interrupted during running", e); Thread.currentThread().interrupt(); @@ -203,8 +210,6 @@ boolean commit(String transactionId, Timestamp timestamp, ChangeEventSource.Chan errorHandler.setProducerThrowable(e); } finally { taskCounter.decrementAndGet(); - metrics.ifPresent(m -> m.setActiveTransactions(transactions.size())); - metrics.ifPresent(m -> m.incrementCommittedDmlCounter(commitCallbacks.size())); } }); @@ -340,8 +345,10 @@ public interface CommitCallback { * * @param timestamp commit timestamp * @param smallestScn smallest SCN among other transactions + * @param commitScn commit SCN + * @param callbackNumber number of the callback in the transaction */ - void execute(Timestamp timestamp, BigDecimal smallestScn) throws InterruptedException; + void execute(Timestamp timestamp, BigDecimal smallestScn, BigDecimal commitScn, int callbackNumber) throws InterruptedException; } @NotThreadSafe diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java index 1c4844836..5b9e99c93 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java @@ -23,6 +23,7 @@ @ThreadSafe public class TransactionalBufferMetrics extends Metrics implements TransactionalBufferMetricsMXBean { private AtomicLong oldestScn = new AtomicLong(); + private AtomicLong committedScn = new AtomicLong(); private AtomicReference lagFromTheSource = new AtomicReference<>(); private AtomicInteger activeTransactions = new AtomicInteger(); private AtomicLong rolledBackTransactions = new AtomicLong(); @@ -37,10 +38,32 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional private Instant startTime; private static long MILLIS_PER_SECOND = 1000L; + + // temp todo delete after stowplan testing + private Long ufvDelete = 0L; + private Long ufvInsert = 0L; + + public Long getUfvDelete() { + return ufvDelete; + } + + public void incrementUfvDelete() { + this.ufvDelete++; + } + + public Long getUfvInsert() { + return ufvInsert; + } + + public void incrementUfvInsert() { + this.ufvInsert++; + } + TransactionalBufferMetrics(CdcSourceTaskContext taskContext) { super(taskContext, "log-miner-transactional-buffer"); startTime = Instant.now(); oldestScn.set(-1); + committedScn.set(-1); lagFromTheSource.set(Duration.ZERO); reset(); } @@ -50,6 +73,10 @@ void setOldestScn(Long scn){ oldestScn.set(scn); } + public void setCommittedScn(Long scn){ + committedScn.set(scn); + } + // todo deal with timezones void setLagFromTheSource(Instant changeTime){ if (changeTime != null) { @@ -108,6 +135,11 @@ public Long getOldestScn() { return oldestScn.get(); } + @Override + public Long getCommittedScn() { + return committedScn.get(); + } + @Override public int getNumberOfActiveTransactions() { return activeTransactions.get(); @@ -182,6 +214,7 @@ public void reset() { public String toString() { return "TransactionalBufferMetrics{" + "oldestScn=" + oldestScn.get() + + ", committedScn=" + committedScn.get() + ", lagFromTheSource=" + lagFromTheSource.get() + ", activeTransactions=" + activeTransactions.get() + ", rolledBackTransactions=" + rolledBackTransactions.get() + diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java index 3f8d257b0..ea52fdc62 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java @@ -54,6 +54,12 @@ public interface TransactionalBufferMetricsMXBean { */ Long getOldestScn(); + /** + * It shows last committed SCN + * @return committed SCN + */ + Long getCommittedScn(); + /** * This is to get the lag between latest captured change timestamp in REDO LOG and time of it's placement in the buffer * @return lag in milliseconds @@ -95,4 +101,10 @@ public interface TransactionalBufferMetricsMXBean { * action to reset some metrics */ void reset(); + + // todo delete after stowplan test + Long getUfvDelete(); + void incrementUfvDelete(); + Long getUfvInsert(); + void incrementUfvInsert(); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index cfe38de4c..7091f59c4 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -60,6 +60,7 @@ public void schemaIsCorrect() { .field("table", Schema.STRING_SCHEMA) .field("txId", Schema.OPTIONAL_STRING_SCHEMA) .field("scn", Schema.OPTIONAL_INT64_SCHEMA) + .field("commit_scn", Schema.OPTIONAL_INT64_SCHEMA) .field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA) .build(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 1044d20f9..4d3f0b7cb 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -5,8 +5,11 @@ */ package io.debezium.connector.oracle.logminer; +import io.debezium.config.Configuration; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.oracle.OracleConnector; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import org.junit.After; @@ -18,6 +21,7 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -41,6 +45,19 @@ public class TransactionalBufferTest { private static final BigDecimal OTHER_SCN = BigDecimal.TEN; private static final BigDecimal LARGEST_SCN = BigDecimal.valueOf(100L); private static final Timestamp TIMESTAMP = new Timestamp(System.currentTimeMillis()); + private static final Configuration config = new Configuration() { + @Override + public Set keys() { + return null; + } + + @Override + public String getString(String key) { + return null; + } + }; + private static final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); + private static OracleOffsetContext offsetContext; private ErrorHandler errorHandler; private TransactionalBuffer transactionalBuffer; @@ -69,22 +86,24 @@ public void testIsEmpty() { @Test public void testIsNotEmptyWhenTransactionIsRegistered() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); } @Test public void testIsNotEmptyWhenTransactionIsCommitting() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> Thread.sleep(1000)); - transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> Thread.sleep(1000)); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true); + transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); } @Test public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> commitLatch.countDown()); - transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true); + transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); Thread.sleep(1000); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); @@ -92,7 +111,7 @@ public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException @Test public void testIsEmptyWhenTransactionIsRolledBack() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); transactionalBuffer.rollback(TRANSACTION_ID, ""); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); @@ -100,8 +119,8 @@ public void testIsEmptyWhenTransactionIsRolledBack() { @Test public void testNonEmptyFirstTransactionIsRolledBack() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn) -> { }); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn, commitScn, counter) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); transactionalBuffer.rollback(TRANSACTION_ID, ""); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); @@ -111,8 +130,8 @@ public void testNonEmptyFirstTransactionIsRolledBack() { @Test public void testNonEmptySecondTransactionIsRolledBack() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); transactionalBuffer.rollback(OTHER_TRANSACTION_ID, ""); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); @@ -124,12 +143,13 @@ public void testNonEmptySecondTransactionIsRolledBack() { public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit - transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true); + transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN.add(BigDecimal.ONE)); // after commit @@ -142,13 +162,14 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit - transactionalBuffer.commit(TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true); + transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); // after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); @@ -159,15 +180,16 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted @Test public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); CountDownLatch commitLatch = new CountDownLatch(1); AtomicReference smallestScnContainer = new AtomicReference<>(); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> { + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit - transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true); + transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE); commitLatch.await(); assertThat(smallestScnContainer.get()).isEqualTo(SCN); // after committing OTHER_TRANSACTION_ID @@ -177,10 +199,11 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte @Test public void testResetLargestScn() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> { }); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {}); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit - transactionalBuffer.commit(OTHER_TRANSACTION_ID, TIMESTAMP, () -> true, MESSAGE); + offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true); + transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN, offsetContext, TIMESTAMP, () -> true, MESSAGE); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN.add(BigDecimal.ONE)); // after commit transactionalBuffer.resetLargestScn(null); @@ -191,7 +214,7 @@ public void testResetLargestScn() { @Test public void testAbandoningOneTransaction() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {}); transactionalBuffer.abandonLongTransactions(SCN.longValue()); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO); @@ -199,8 +222,8 @@ public void testAbandoningOneTransaction() { @Test public void testAbandoningTransactionHavingAnotherOne() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {}); transactionalBuffer.abandonLongTransactions(SCN.longValue()); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); @@ -208,9 +231,9 @@ public void testAbandoningTransactionHavingAnotherOne() { @Test public void testTransactionDump() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_TWO, (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_TWO, (timestamp, smallestScn, commitScn, counter) -> {}); assertThat(transactionalBuffer.toString()).contains(SQL_ONE); assertThat(transactionalBuffer.toString()).contains(SQL_TWO); } @@ -224,30 +247,30 @@ public void testDuplicatedRedoSql() { final String anotherInsertIntoATable = "another insert into a table"; final String duplicatedInsertIntoATable = "duplicated insert into a table"; - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn) -> { }); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), anotherInsertIntoATable, (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn, commitScn, counter) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), anotherInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> { }); assertThat(transactionalBuffer.getLargestScn().equals(OTHER_SCN)); assertThat(transactionalBuffer.toString().contains(insertIntoATable)); assertThat(transactionalBuffer.toString().contains(anotherInsertIntoATable)); transactionalBuffer.rollback(TRANSACTION_ID, ""); // duplications are OK in different transactions - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> { }); - transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> { }); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> { }); + transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> { }); assertThat(transactionalBuffer.toString().indexOf(duplicatedInsertIntoATable) != transactionalBuffer.toString().lastIndexOf(duplicatedInsertIntoATable)); transactionalBuffer.rollback(TRANSACTION_ID, ""); transactionalBuffer.rollback(OTHER_TRANSACTION_ID, ""); // duplications are NOT OK in a transactions for different SCNs if they are sequential - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> {}); assertThat(transactionalBuffer.toString().indexOf(duplicatedInsertIntoATable) == transactionalBuffer.toString().lastIndexOf(duplicatedInsertIntoATable)); transactionalBuffer.rollback(TRANSACTION_ID, ""); // duplications are OK in a transactions for different SCNs if they are NOT sequential - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn) -> {}); - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, LARGEST_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), insertIntoATable, (timestamp, smallestScn, commitScn, counter) -> {}); + transactionalBuffer.registerCommitCallback(TRANSACTION_ID, LARGEST_SCN, Instant.now(), duplicatedInsertIntoATable, (timestamp, smallestScn, commitScn, counter) -> {}); assertThat(transactionalBuffer.toString().indexOf(duplicatedInsertIntoATable) != transactionalBuffer.toString().lastIndexOf(duplicatedInsertIntoATable)); transactionalBuffer.rollback(TRANSACTION_ID, ""); }