From dfe77f6d85c28ad8ef97447105a1480ce43bee8a Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Thu, 20 Dec 2018 20:32:19 +0300 Subject: [PATCH 1/8] IGNITE-10730: WIP. --- .../pagemem/wal/record/MvccDataRecord.java | 7 +- .../processors/cache/GridCacheEntryEx.java | 7 +- .../processors/cache/GridCacheMapEntry.java | 72 ++++++++----------- .../cache/transactions/IgniteTxAdapter.java | 30 +++++++- .../cache/transactions/IgniteTxManager.java | 5 +- .../cache/GridCacheTestEntryEx.java | 7 +- 6 files changed, 76 insertions(+), 52 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java index 276ba1bc05498..4d98b990c5a0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java @@ -43,7 +43,7 @@ public MvccDataRecord(MvccDataEntry writeEntry) { /** * @param writeEntries Write entries. */ - public MvccDataRecord(List writeEntries) { + public MvccDataRecord(List writeEntries) { this(writeEntries, U.currentTimeMillis()); } @@ -58,8 +58,9 @@ public MvccDataRecord(MvccDataEntry writeEntry, long timestamp) { * @param writeEntries Write entries. * @param timestamp TimeStamp. */ - public MvccDataRecord(List writeEntries, long timestamp) { - super(writeEntries, timestamp); + @SuppressWarnings("unchecked") + public MvccDataRecord(List writeEntries, long timestamp) { + super((List)writeEntries, timestamp); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 8cef1763af868..13a8f67a25d59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; @@ -370,7 +371,7 @@ public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics, * @throws GridCacheEntryRemovedException If entry has been removed. */ public GridCacheUpdateTxResult mvccSet( - @Nullable IgniteInternalTx tx, + IgniteTxAdapter tx, UUID affNodeId, CacheObject val, EntryProcessor entryProc, @@ -400,7 +401,7 @@ public GridCacheUpdateTxResult mvccSet( * @throws GridCacheEntryRemovedException If entry has been removed. */ public GridCacheUpdateTxResult mvccRemove( - @Nullable IgniteInternalTx tx, + IgniteTxAdapter tx, UUID affNodeId, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, @@ -1195,7 +1196,7 @@ public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorCl * @throws GridCacheEntryRemovedException, If entry has been removed. */ public GridCacheUpdateTxResult mvccUpdateRowsWithPreloadInfo( - IgniteInternalTx tx, + IgniteTxAdapter tx, UUID affNodeId, AffinityTopologyVersion topVer, List entries, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 881e8688411a0..d6a7ea571a7f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; @@ -1082,7 +1083,7 @@ protected void recordNodeId(UUID nodeId, AffinityTopologyVersion topVer) { /** {@inheritDoc} */ @Override public final GridCacheUpdateTxResult mvccSet( - IgniteInternalTx tx, + IgniteTxAdapter tx, UUID affNodeId, CacheObject val, EntryProcessor entryProc, @@ -1104,8 +1105,6 @@ protected void recordNodeId(UUID nodeId, AffinityTopologyVersion topVer) { final GridCacheVersion newVer; - WALPointer logPtr = null; - ensureFreeSpace(); lockEntry(); @@ -1221,7 +1220,7 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { } if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) { - logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + tx.logMvccEntry(new MvccDataEntry( cctx.cacheId(), key, val, @@ -1233,7 +1232,7 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { key.partition(), 0L, mvccVer) - )); + ); } update(val, expireTime, ttl, newVer, true); @@ -1250,8 +1249,8 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { onUpdateFinished(0L); - GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : - new GridCacheUpdateTxResult(false, logPtr); + GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, null) : + new GridCacheUpdateTxResult(false, (WALPointer)null); if (retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) updRes.prevValue(res.oldValue()); @@ -1275,7 +1274,7 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { /** {@inheritDoc} */ @Override public final GridCacheUpdateTxResult mvccRemove( - IgniteInternalTx tx, + IgniteTxAdapter tx, UUID affNodeId, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, @@ -1290,8 +1289,6 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) { final GridCacheVersion newVer; - WALPointer logPtr = null; - lockEntry(); MvccUpdateResult res; @@ -1350,7 +1347,7 @@ else if (res.resultType() == ResultType.LOCKED) { } if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) - logPtr = logMvccUpdate(tx, null, 0, 0L, mvccVer); + logMvccUpdate(tx, null, 0, 0L, mvccVer); update(null, 0, 0, newVer, true); @@ -1366,8 +1363,8 @@ else if (res.resultType() == ResultType.LOCKED) { onUpdateFinished(0L); - GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : - new GridCacheUpdateTxResult(false, logPtr); + GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, null) : + new GridCacheUpdateTxResult(false, (WALPointer) null); if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) updRes.prevValue(res.oldValue()); @@ -1391,8 +1388,6 @@ else if (res.resultType() == ResultType.LOCKED) { final GridCacheVersion newVer; - WALPointer logPtr = null; - lockEntry(); try { @@ -1434,7 +1429,7 @@ else if (res.resultType() == ResultType.LOCKED) { onUpdateFinished(0L); - return new GridCacheUpdateTxResult(valid, logPtr); + return new GridCacheUpdateTxResult(valid, (WALPointer)null); } /** {@inheritDoc} */ @@ -4374,7 +4369,7 @@ protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expi * @param mvccVer Mvcc version. * @throws IgniteCheckedException In case of log failure. */ - protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr, + protected WALPointer logMvccUpdate(IgniteTxAdapter tx, CacheObject val, long expireTime, long updCntr, MvccSnapshot mvccVer) throws IgniteCheckedException { assert mvccVer != null; @@ -4387,7 +4382,7 @@ protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long ex else op = this.val == null ? GridCacheOperation.CREATE : GridCacheOperation.UPDATE; - return cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + tx.logMvccEntry(new MvccDataEntry( cctx.cacheId(), key, val, @@ -4397,10 +4392,10 @@ protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long ex expireTime, key.partition(), updCntr, - mvccVer))); + mvccVer)); } - else - return null; + + return null; } /** @@ -5087,7 +5082,7 @@ private static class MvccRemoveLockListener implements IgniteInClosure runEntryProcessor(CacheInvokeEntry entries, @@ -6731,8 +6723,6 @@ private IgniteBiTuple runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry runEntryProcessor(CacheInvokeEntry TX_COUNTERS_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters"); + public static final int MVCC_WAL_RECORD_BUFFER_SIZE = 20; + /** Logger. */ protected static IgniteLogger log; @@ -278,6 +282,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringInclude protected volatile MvccSnapshot mvccSnapshot; + @GridToStringExclude + private List enlistBuffer; + /** Rollback finish future. */ @GridToStringExclude private volatile IgniteInternalFuture rollbackFut; @@ -418,7 +425,7 @@ public void setParentTx(GridNearTxLocal parentTx) { /** * @return Mvcc info. */ - @Override @Nullable public MvccSnapshot mvccSnapshot() { + @Override public @Nullable MvccSnapshot mvccSnapshot() { return mvccSnapshot; } @@ -427,6 +434,27 @@ public void setParentTx(GridNearTxLocal parentTx) { this.mvccSnapshot = mvccSnapshot; } + + public void logMvccEntry(MvccDataEntry entry) throws IgniteCheckedException { + if (isRollbackOnly()) + return; // Noop. + + if (enlistBuffer == null) + enlistBuffer = new ArrayList<>(MVCC_WAL_RECORD_BUFFER_SIZE); + + enlistBuffer.add(entry); + + if (enlistBuffer.size() == MVCC_WAL_RECORD_BUFFER_SIZE) + flushEnlistBuffer(); + } + + public void flushEnlistBuffer() throws IgniteCheckedException { + if (!isRollbackOnly() && enlistBuffer != null) + cctx.wal().log(new MvccDataRecord(enlistBuffer)); + + enlistBuffer = null; + } + /** * @return Shared cache context. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index ce914e8a98865..76c11fa9bdf2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2454,8 +2454,11 @@ public void mvccPrepare(IgniteTxAdapter tx) throws IgniteCheckedException { cctx.database().checkpointReadLock(); try { - if (cctx.wal() != null) + if (cctx.wal() != null) { + tx.flushEnlistBuffer(); + cctx.wal().log(newTxRecord(tx)); + } cctx.coordinators().updateState(tx.mvccSnapshot, TxState.PREPARED); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 358dfc391ae01..e93e9a28a896e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; @@ -488,7 +489,7 @@ void recheckLock() { } /** {@inheritDoc} */ - @Override public GridCacheUpdateTxResult mvccSet(@Nullable IgniteInternalTx tx, UUID affNodeId, CacheObject val, + @Override public GridCacheUpdateTxResult mvccSet(IgniteTxAdapter tx, UUID affNodeId, CacheObject val, EntryProcessor entryProc, Object[] invokeArgs, long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, GridCacheOperation op, boolean needHistory, boolean noCreate, boolean needOldVal, CacheEntryPredicate filter, boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException { @@ -498,7 +499,7 @@ void recheckLock() { } /** {@inheritDoc} */ - @Override public GridCacheUpdateTxResult mvccRemove(@Nullable IgniteInternalTx tx, UUID affNodeId, + @Override public GridCacheUpdateTxResult mvccRemove(IgniteTxAdapter tx, UUID affNodeId, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory, boolean needOldVal, CacheEntryPredicate filter, boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException { @@ -951,7 +952,7 @@ GridCacheMvccCandidate anyOwner() { } /** {@inheritDoc} */ - @Override public GridCacheUpdateTxResult mvccUpdateRowsWithPreloadInfo(IgniteInternalTx tx, + @Override public GridCacheUpdateTxResult mvccUpdateRowsWithPreloadInfo(IgniteTxAdapter tx, UUID affNodeId, AffinityTopologyVersion topVer, List entries, From 2c2abd3313484f69cdb5dd5df5e0e2be09e3c979 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Fri, 21 Dec 2018 12:40:07 +0300 Subject: [PATCH 2/8] IGNITE-10730: Minor. --- .../reader/StandaloneWalRecordsIterator.java | 5 +++-- .../serializer/RecordDataV2Serializer.java | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 238f99ae7781e..f2ebdf9758a4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -394,7 +394,8 @@ private boolean checkBounds(long idx) { * @return post-processed record. * @throws IgniteCheckedException if failed. */ - @NotNull private WALRecord postProcessDataRecord( + @SuppressWarnings("unchecked") + private @NotNull WALRecord postProcessDataRecord( @NotNull DataRecord dataRec, GridKernalContext kernalCtx, IgniteCacheObjectProcessor processor @@ -412,7 +413,7 @@ private boolean checkBounds(long idx) { } DataRecord res = dataRec instanceof MvccDataRecord ? - new MvccDataRecord(postProcessedEntries, dataRec.timestamp()) : + new MvccDataRecord((List)postProcessedEntries, dataRec.timestamp()) : new DataRecord(postProcessedEntries, dataRec.timestamp()); res.size(dataRec.size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index dd25b8cca4049..91d784cb3a600 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -141,7 +141,7 @@ public RecordDataV2Serializer(GridCacheSharedContext cctx) { return cpRec; - case DATA_RECORD: + case DATA_RECORD: { int entryCnt = in.readInt(); long timeStamp = in.readLong(); @@ -151,28 +151,31 @@ public RecordDataV2Serializer(GridCacheSharedContext cctx) { entries.add(readPlainDataEntry(in)); return new DataRecord(entries, timeStamp); + } - case MVCC_DATA_RECORD: - entryCnt = in.readInt(); - timeStamp = in.readLong(); + case MVCC_DATA_RECORD: { + int entryCnt = in.readInt(); + long timeStamp = in.readLong(); - entries = new ArrayList<>(entryCnt); + List entries = new ArrayList<>(entryCnt); for (int i = 0; i < entryCnt; i++) entries.add(readMvccDataEntry(in)); return new MvccDataRecord(entries, timeStamp); + } - case ENCRYPTED_DATA_RECORD: - entryCnt = in.readInt(); - timeStamp = in.readLong(); + case ENCRYPTED_DATA_RECORD: { + int entryCnt = in.readInt(); + long timeStamp = in.readLong(); - entries = new ArrayList<>(entryCnt); + List entries = new ArrayList<>(entryCnt); for (int i = 0; i < entryCnt; i++) entries.add(readEncryptedDataEntry(in)); return new DataRecord(entries, timeStamp); + } case SNAPSHOT: long snpId = in.readLong(); From f9f1d85f9ab9949fcc52162f17487ef6afa06411 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Fri, 21 Dec 2018 15:49:39 +0300 Subject: [PATCH 3/8] IGNITE-10730: Minor. --- .../cache/transactions/IgniteTxAdapter.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f0e652a014649..d589828da4bea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -144,6 +144,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private static final AtomicReferenceFieldUpdater TX_COUNTERS_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters"); + /** WAL MvccDataRecord batch size. */ public static final int MVCC_WAL_RECORD_BUFFER_SIZE = 20; /** Logger. */ @@ -282,8 +283,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringInclude protected volatile MvccSnapshot mvccSnapshot; + /** WAL data records buffer for enlisted entries. */ @GridToStringExclude - private List enlistBuffer; + private List enlistedRecordsForWal; /** Rollback finish future. */ @GridToStringExclude @@ -434,25 +436,35 @@ public void setParentTx(GridNearTxLocal parentTx) { this.mvccSnapshot = mvccSnapshot; } + /** + * Perform buffered logging to WAL for enlisted entries. + * + * @param rec Enlisted entry WAL record. + * @throws IgniteCheckedException If fails. + */ + public void logMvccEntry(MvccDataEntry rec) throws IgniteCheckedException { + if (cctx.wal() == null || isRollbackOnly()) + return; // May safely omit for roll-backed tx. - public void logMvccEntry(MvccDataEntry entry) throws IgniteCheckedException { - if (isRollbackOnly()) - return; // Noop. - - if (enlistBuffer == null) - enlistBuffer = new ArrayList<>(MVCC_WAL_RECORD_BUFFER_SIZE); + if (enlistedRecordsForWal == null) + enlistedRecordsForWal = new ArrayList<>(MVCC_WAL_RECORD_BUFFER_SIZE); - enlistBuffer.add(entry); + enlistedRecordsForWal.add(rec); - if (enlistBuffer.size() == MVCC_WAL_RECORD_BUFFER_SIZE) + if (enlistedRecordsForWal.size() == MVCC_WAL_RECORD_BUFFER_SIZE) flushEnlistBuffer(); } - public void flushEnlistBuffer() throws IgniteCheckedException { - if (!isRollbackOnly() && enlistBuffer != null) - cctx.wal().log(new MvccDataRecord(enlistBuffer)); + /** + * Force flush enlisted entries WAL records to WAL. + * @throws IgniteCheckedException If fails. + */ + void flushEnlistBuffer() throws IgniteCheckedException { + // No need to log garbage for rolled backed tx. + if (!isRollbackOnly() && enlistedRecordsForWal != null) + cctx.wal().log(new MvccDataRecord(enlistedRecordsForWal)); - enlistBuffer = null; + enlistedRecordsForWal = null; } /** From f77183908d5e641dfe3d0fd1d16394d5c3482337 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Fri, 21 Dec 2018 15:58:58 +0300 Subject: [PATCH 4/8] IGNITE-10730: Minor. --- .../cache/local/GridCacheLocalTxSingleThreadedSelfTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java index 5727c57d15516..2bfd14c9889b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java @@ -24,16 +24,21 @@ import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import static org.apache.ignite.cache.CacheMode.LOCAL; /** * Tests for local transactions. */ +@RunWith(JUnit4.class) public class GridCacheLocalTxSingleThreadedSelfTest extends IgniteTxSingleThreadedAbstractTest { /** {@inheritDoc} */ @Override public void beforeTest() throws Exception { MvccFeatureChecker.failIfNotSupported(MvccFeatureChecker.Feature.LOCAL_CACHE); + + super.beforeTest(); } /** Cache debug flag. */ From d5cd89699bc86f39699ebcfb29f8a791a06144f6 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Fri, 21 Dec 2018 16:02:32 +0300 Subject: [PATCH 5/8] IGNITE-10730: Minor. --- .../distributed/dht/CacheGetReadFromBackupFailoverTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java index 53d33b705a593..4243a115e283b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; import org.junit.Test; @@ -144,6 +145,9 @@ public int gridCount() { */ @Test public void testFailover() throws Exception { + if (MvccFeatureChecker.forcedMvcc()) + fail("https://issues.apache.org/jira/browse/IGNITE-10274"); + Ignite ignite = ignite(0); ignite.cluster().active(true); From e3557aeda3f963da3c6e669cd0bf3588ce911533 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Mon, 24 Dec 2018 15:22:01 +0300 Subject: [PATCH 6/8] IGNITE-10730: Minor. --- .../cache/transactions/IgniteTxAdapter.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index d589828da4bea..a4a301abde3fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -144,8 +144,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private static final AtomicReferenceFieldUpdater TX_COUNTERS_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters"); - /** WAL MvccDataRecord batch size. */ - public static final int MVCC_WAL_RECORD_BUFFER_SIZE = 20; + /** Default WAL MvccDataRecord batch size. */ + private static final int DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE = 20; /** Logger. */ protected static IgniteLogger log; @@ -285,7 +285,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** WAL data records buffer for enlisted entries. */ @GridToStringExclude - private List enlistedRecordsForWal; + private List enlistWalBuffer; /** Rollback finish future. */ @GridToStringExclude @@ -446,12 +446,12 @@ public void logMvccEntry(MvccDataEntry rec) throws IgniteCheckedException { if (cctx.wal() == null || isRollbackOnly()) return; // May safely omit for roll-backed tx. - if (enlistedRecordsForWal == null) - enlistedRecordsForWal = new ArrayList<>(MVCC_WAL_RECORD_BUFFER_SIZE); + if (enlistWalBuffer == null) + enlistWalBuffer = new ArrayList<>(DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE); - enlistedRecordsForWal.add(rec); + enlistWalBuffer.add(rec); - if (enlistedRecordsForWal.size() == MVCC_WAL_RECORD_BUFFER_SIZE) + if (enlistWalBuffer.size() == DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE) flushEnlistBuffer(); } @@ -461,10 +461,10 @@ public void logMvccEntry(MvccDataEntry rec) throws IgniteCheckedException { */ void flushEnlistBuffer() throws IgniteCheckedException { // No need to log garbage for rolled backed tx. - if (!isRollbackOnly() && enlistedRecordsForWal != null) - cctx.wal().log(new MvccDataRecord(enlistedRecordsForWal)); + if (!isRollbackOnly() && enlistWalBuffer != null) + cctx.wal().log(new MvccDataRecord(enlistWalBuffer)); - enlistedRecordsForWal = null; + enlistWalBuffer.clear(); } /** From 478fc7e3af5f9c8a38fb046462721c0ad3c81648 Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Tue, 22 Jan 2019 16:39:19 +0300 Subject: [PATCH 7/8] IGNITE-10730: Fix data record buffer flushing. --- .../processors/cache/transactions/IgniteTxManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index e8d1a3f9c0a51..89d30a0f3cf65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2490,6 +2490,9 @@ record = new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), no record = new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes); try { + if (record.state() == PREPARED) + tx.flushEnlistBuffer(); + return cctx.wal().log(record); } catch (IgniteCheckedException e) { From 35a59fc7ec48c1b622559849aa4323ee68c314cc Mon Sep 17 00:00:00 2001 From: "Andrey V. Mashenkov" Date: Fri, 1 Mar 2019 17:26:39 +0300 Subject: [PATCH 8/8] IGNITE-10730: Minor. --- .../cache/transactions/IgniteTxAdapter.java | 23 ++++++++++++------- ...ridCacheLocalTxSingleThreadedSelfTest.java | 2 -- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 78604065b918d..654c48cd2d830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -285,7 +285,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** WAL data records buffer for enlisted entries. */ @GridToStringExclude - private List enlistWalBuffer; + private volatile List enlistWalBuffer; /** {@code True} if tx should skip adding itself to completed version map on finish. */ private boolean skipCompletedVers; @@ -466,10 +466,12 @@ public void logMvccEntry(MvccDataEntry rec) throws IgniteCheckedException { if (enlistWalBuffer == null) enlistWalBuffer = new ArrayList<>(DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE); - enlistWalBuffer.add(rec); + synchronized (enlistWalBuffer) { + enlistWalBuffer.add(rec); - if (enlistWalBuffer.size() == DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE) - flushEnlistBuffer(); + if (enlistWalBuffer.size() == DFLT_MVCC_ENLIST_WAL_BUFFER_SIZE) + flushEnlistBuffer(); + } } /** @@ -477,11 +479,16 @@ public void logMvccEntry(MvccDataEntry rec) throws IgniteCheckedException { * @throws IgniteCheckedException If fails. */ void flushEnlistBuffer() throws IgniteCheckedException { - // No need to log garbage for rolled backed tx. - if (!isRollbackOnly() && enlistWalBuffer != null) - cctx.wal().log(new MvccDataRecord(enlistWalBuffer)); + if (enlistWalBuffer == null) + return; - enlistWalBuffer.clear(); + synchronized (enlistWalBuffer) { + // No need to log garbage for rolled backed tx. + if (!isRollbackOnly() && enlistWalBuffer != null) + cctx.wal().log(new MvccDataRecord(enlistWalBuffer)); + + enlistWalBuffer.clear(); + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java index 6999049a00514..677dd8d15bb51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxSingleThreadedSelfTest.java @@ -36,8 +36,6 @@ public class GridCacheLocalTxSingleThreadedSelfTest extends IgniteTxSingleThread @Before public void beforeGridCacheLocalTxSingleThreadedSelfTest() { MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.LOCAL_CACHE); - - super.beforeTest(); } /** Cache debug flag. */